/[pdpsoft]/nl.nikhef.pdp.dynsched-pbs-plugin/trunk/pbsServer.py
ViewVC logotype

Contents of /nl.nikhef.pdp.dynsched-pbs-plugin/trunk/pbsServer.py

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2013 - (show annotations) (download) (as text)
Fri Oct 8 13:12:27 2010 UTC (11 years, 11 months ago) by templon
File MIME type: application/x-python
File size: 18074 byte(s)
change HeadURL to URL

1 #--
2 # pbsServer.py -- LRMS classes for PBS-based batch systems.
3 # $Revision: 1939 $
4 # $URL$
5 # $Date: 2010-09-23 16:07:57 +0200 (Thu, 23 Sep 2010) $
6 # $Id$
7
8 #--
9
10 # Two classes derived from lrms.Server are included; one for
11 # working with a live PBS server, and one for representing historical
12 # states of the server from PBS accounting log files. A third class
13 # (History) is associated with the historical server class.
14
15 from lrms import * # base Server class, Job class
16 import commands
17 import grp
18 import pwd
19 import re
20 import string
21 import sys
22 import time
23
24 class LiveServer(Server):
25
26 def __init__(self,*arg,**kw):
27
28 Server.__init__(self)
29
30 cnow = time.ctime() # this hack is to get around time zone problems
31 if 'file' in kw.keys() and kw['file'] != None:
32 cmdstr = '/bin/cat ' + kw['file']
33 else:
34 cmdstr = 'qstat -f'
35 (stat, qstatout) = commands.getstatusoutput(cmdstr)
36 from torque_utils import pbsnodes
37 cpucount = 0
38 jobcount = 0
39 for node in pbsnodes():
40 if node.isUp():
41 cpucount += node.numCpu
42 for cpu in range(node.numCpu):
43 jobcount += len(node.jobs[cpu])
44 self.slotsUp = cpucount
45 self.slotsFree = cpucount - jobcount
46
47 nowtuple = time.strptime(cnow,"%c")
48 self.__evtime__ = int(time.mktime(nowtuple))
49
50 keyvalpat = re.compile(r' (\S+) = (.*)')
51
52 verbose_job_info=string.split(qstatout,"\n\n")
53 for j in verbose_job_info:
54 newj = Job()
55 lines = string.split(j,'\n ')
56 qstatDict = dict()
57 for ll in lines:
58 if string.find(ll,'Job Id') == 0:
59 newj.set('jobid',string.split(ll)[2])
60 continue
61 l = ll.replace('\n\t','')
62 mk = keyvalpat.match(l)
63 if not mk:
64 print "fatal error, should always be able to find a match"
65 print "unmatchable string was:", l
66 sys.exit(3)
67 qstatDict[mk.group(1)] = mk.group(2)
68
69 # print qstatDict.keys()
70
71 # do owner and group. try euser and group first, if not found, back off to old method,
72 # which was "user@host" parsing plus getgrid on user.
73
74 keysfound = qstatDict.keys()
75 if 'euser' in keysfound and 'egroup' in keysfound:
76 newj.set('user', qstatDict['euser'])
77 newj.set('group',qstatDict['egroup'])
78 elif 'Job_Owner' in keysfound:
79 user_and_host = qstatDict['Job_Owner']
80 user = string.split(user_and_host,'@')[0]
81 newj.set('user',user)
82 try:
83 thisgroup=pwd.getpwnam(user)[3]
84 groupname=grp.getgrgid(thisgroup)[0]
85 except:
86 thisgroup='unknown'
87 groupname='unknown'
88 newj.set('group',groupname)
89 else:
90 print "Can't find user and group of job", newj.get('jobid')
91 sys.exit(4)
92
93 # do job state
94 if 'job_state' in keysfound:
95 statelett = qstatDict['job_state']
96 if statelett in ['Q','W']:
97 val = 'queued'
98 elif statelett in ['R','E']:
99 val = 'running'
100 elif statelett in ['H', 'T']:
101 val = 'pending'
102 elif statelett == 'C':
103 val = 'done'
104 else:
105 val = 'unknown'
106 newj.set('state',val)
107 else:
108 newj.set('state','unknown')
109
110 newj.set('queue', qstatDict['queue'])
111
112 if 'qtime' in keysfound:
113 timestring = qstatDict['qtime']
114 timetuple = time.strptime(timestring,"%c")
115 newj.set('qtime',time.mktime(timetuple))
116 if 'Resource_List.walltime' in keysfound:
117 hms = qstatDict['Resource_List.walltime']
118 t = string.split(hms,':')
119 secs = int(t[2]) + \
120 60.0*(int(t[1]) + 60*int(t[0]))
121 newj.set('maxwalltime',secs)
122 if 'resources_used.walltime' in keysfound:
123 hms = qstatDict['resources_used.walltime']
124 t = string.split(hms,':')
125 secs = int(t[2]) + \
126 60.0*(int(t[1]) + 60*int(t[0]))
127 newj.set('walltime', secs)
128
129 # do start time. if start_time is present use that,
130 # otherwise, back off to compute it via subtracting elapsed walltime from snapshot time.
131
132 if 'start_time' in keysfound:
133 timestring = qstatDict['start_time']
134 timetuple = time.strptime(timestring,"%c")
135 newj.set('start',time.mktime(timetuple))
136 newj.set('startAnchor', 'start_time')
137 else:
138 start = self.__evtime__ - secs
139 newj.set('start',start)
140 newj.set('startAnchor', 'resources_used.walltime')
141 if 'Job_Name' in keysfound:
142 jnam = qstatDict['Job_Name']
143 newj.set('name',jnam)
144 if 'exec_host' in keysfound:
145 hlist = qstatDict['exec_host']
146 ncpu = hlist.count('+') + 1
147 newj.set('cpucount',ncpu)
148
149 self.addjob(newj.get('jobid'),newj)
150
151 import copy
152
153 ## following is helper for class Event. superseded by newer keyvallist2dict function.
154 ## takes as arg a string of key=val pairs, returns a dict with the same
155 ## structure. example input string:
156 ## user=tdykstra group=niktheorie jobname=Q11_241828.gjob
157
158 def keyval2dict(astring):
159 flds = string.split(astring)
160 d = {}
161 for f in flds:
162 kv=f.split("=",1)
163 if len(kv) == 2:
164 d[kv[0]] = kv[1]
165 else:
166 print f
167 print kv
168 raise CantHappenException
169 return d
170
171 ## following is helper for class Event.
172 ## takes as arg a list of key=val pairs, returns a dict with the same
173 ## structure. example input string:
174 ## ['user=tdykstra', 'group=niktheorie', 'jobname=Q11_241828.gjob']
175
176 def keyvallist2dict(kvlist):
177 d = {}
178 for f in kvlist:
179 kv=f.split("=",1)
180 if len(kv) == 2:
181 d[kv[0]] = kv[1]
182 else:
183 print "tried to split:", f, ", result was:", kv
184 raise CantHappenException
185 return d
186
187 class Event:
188
189 # simple class to represent events like job queued, job started, etc.
190
191 def __init__(self,evstring,debug=0):
192
193 self.__time__ = None # default values
194 self.__type__ = None
195 self.__jobid__ = None
196 self.__info__ = { }
197
198 # search pattern for parsing string using "re" module
199 # for successful search, fields are:
200 # 1) timestamp
201 # 2) event type (Q,S,E,D, etc)
202 # 3) local PBS jobID
203 # 4) rest of line (key=value) to be parsed otherwise
204 # this structure is matched by evpatt
205
206 evpatt = "^(.+);([A-Z]);(.+);(.*)"
207 m = re.search(evpatt,evstring)
208 if not m:
209 print "parse patt failed, offending line is"
210 print evstring
211 return
212 if debug:
213 print "timestamp", m.group(1)
214 print "code", m.group(2)
215 print "jobid", m.group(3)
216 print "attrs", m.group(4)
217
218 # tpatt matches strings of form key=val
219 # lookahead assertion is necessary to work around presence of ' ' and '=' in some
220 # 'val' strings (like account, or neednodes with multiple processors)
221
222 tpatt = r'[a-z._A-Z]+=[a-z0-9A-Z=/: -@_]+?(?=$| [a-z._A-Z]+=)'
223 tprog = re.compile(tpatt)
224 tmatch = tprog.findall(m.group(4))
225 if debug:
226 print "result of key=val match pattern:", tmatch
227
228 # parse timestamp
229
230 ttup=time.strptime(m.group(1),"%m/%d/%Y %H:%M:%S")
231
232 # last element of time tuple is DST, but PBS log files
233 # don't specify time zone. Setting the last element of
234 # the tuple to -1 asks libC to figure it out based on
235 # local time zone of machine
236
237 atup = ttup[:8] + (-1,)
238 self.__time__ = int(time.mktime(atup))
239 self.__type__ = m.group(2)
240 self.__jobid__ = m.group(3)
241 self.__info__ = keyvallist2dict(tmatch)
242
243 def time(self):
244 return self.__time__
245 def type(self):
246 return self.__type__
247 def jobid(self):
248 return self.__jobid__
249 def info(self,key=''):
250 if key == '':
251 return self.__info__
252 else:
253 if key in self.__info__.keys():
254 return self.__info__[key]
255 else:
256 return None
257 def __str__(self):
258 return self.__jobid__ + ' :: event type ' + self.__type__ + \
259 ' at ' + str(self.__time__)
260
261 class History:
262
263 # the idea here is to generate a couple of data structures:
264 # first the job catalog, which associates things like
265 # queue entry time, start time, stop time, owner username/groupname,
266 # etc. with each PBS jobid.
267 # second the event list, which records when jobs enter the various
268 # queues, start to execute, and terminate. This list only holds
269 # a timestamp, jobid, and event type. This seems to be the
270 # minimal amount of information we'll need to be able to generate
271 # the state of the queue at any given arbitrary time.
272
273 ## functions for using the job catalog. This beast is a dictionary
274 ## that has one entry (an object of class Job) per job seen in the
275 ## log files.
276
277 def addjob(self, jobid, job) : # add new job entry
278 if self.hasjob(jobid):
279 print "job already found, exiting"
280 sys.exit(1)
281 job.set('jobid',jobid)
282 self.__jobcat__[jobid] = job
283
284 def hasjob(self,jobid) : # test if job is already in catalogue
285 if jobid in self.__jobcat__.keys():
286 return 1
287 else:
288 return 0
289
290 def setjobinfo(self,jobid,key,val) : # add or set info to/for existing job
291 if not self.hasjob(jobid):
292 print "job not found:", jobid, key, val
293 return 0
294 self.__jobcat__[jobid].set(key,val)
295 return 1
296
297 def getjobinfo(self,jobid,key) : # get info for key from job jobid
298 if not self.hasjob(jobid):
299 print "job not found:", jobid, key
300 return 0
301 return self.__jobcat__[jobid].get(key)
302
303 def getjoblist(self):
304 return self.__jobcat__.keys()
305
306 def getjob(self,jobid):
307 return self.__jobcat__[jobid]
308
309 ## functions for using event list. This beast is just a list (in sequence)
310 ## of all events seen while parsing the log files
311
312 def getfirst_event_time(self):
313 return self.__evlist__[0].time()
314
315 def getevent(self,index):
316 return self.__evlist__[index]
317
318 ## functions for using the job event list. This beast is just a dictionary
319 ## (key=jobid), so only entry in dict for each job; the value for each
320 ## jobid is a list of events seen for this job (in sequence). Rationale
321 ## for this object is it can help in resolving ambiguous cases (like
322 ## multiple "D" events seen).
323
324 def getjobevts(self,jobid):
325 if jobid not in self.__jobevs__.keys():
326 print 'getjobevs: jobid', jobid, 'not found in job_event db'
327 sys.exit(1)
328 return self.__jobevs__[jobid]
329
330 def addevt(self,ev):
331 ## append to event list
332 self.__evlist__.append(ev)
333 ## append to job event struct; create entry for job if needed
334 if ev.jobid() not in self.__jobevs__.keys():
335 self.__jobevs__[ev.jobid()] = [ev]
336 else:
337 self.__jobevs__[ev.jobid()].append(ev)
338
339 def __init__(self,logfilelist):
340
341 self.__evlist__ = [ ]
342 self.__jobcat__ = { }
343 self.__jobevs__ = { }
344
345 for f in logfilelist:
346 inf = open(f,'r')
347 line = inf.readline()
348 while line:
349 line = line[:-1] # chop off trailing newline
350 ev = Event(line) # parse line, create event
351 self.addevt(ev) # adds event to evlist and jobevt struct
352 ## vars for convenience in typing:
353 if ev.type() == 'Q': # job enters sys for 1st time
354 if self.hasjob(ev.jobid()): # should not happen
355 print 'Error, Q event seen for pre-existing job', \
356 ev.jobid()
357 sys.exit(1)
358 newentry = Job() # make new entry, fill info
359 newentry.set('qtime',ev.time())
360 newentry.set('queue',ev.info('queue'))
361 self.addjob(ev.jobid(),newentry)
362 else:
363 ## for all other event types
364 if not self.hasjob(ev.jobid()):
365 newentry = Job() # make new entry
366 self.addjob(ev.jobid(),newentry)
367 job = self.getjob(ev.jobid())
368 if ev.info('qtime'):
369 job.set('qtime',int(ev.info('qtime')))
370 if ev.info('queue') and not job.get('queue'):
371 job.set('queue',ev.info('queue'))
372 if ev.info('user') and not job.get('user'):
373 job.set('user',ev.info('user'))
374 if ev.info('group') and not job.get('group'):
375 job.set('group',ev.info('group'))
376 if ev.info('start') and not job.get('start'):
377 job.set('start',int(ev.info('start')))
378 if ev.info('Resource_List.walltime') and not \
379 job.get('maxwalltime'):
380 hms = ev.info('Resource_List.walltime')
381 (h,m,s) = hms.split(":")
382 maxwallsecs = int(h) * 3600 + int(m) * 60 + int(s)
383 job.set('maxwalltime',maxwallsecs)
384 if ev.info('end') and not job.get('end'):
385 job.set('end',int(ev.info('end')))
386
387 ## special handling for troublesome event types
388
389 if ev.type() == 'T': # job restart after checkpoint
390 ## previous job start record may not have been seen; if not, we don't
391 ## know if it was running or queued before, so just set current event time
392 ## as start since we know it's running now.
393 if not job.get('start'):
394 job.set('start',ev.time())
395 ## in some cases the T record may be the first (or even only) record
396 ## we see. in that case, the only thing we can say is that the qtime
397 ## was before the beginning of the log files ... set qtime like that.
398 ## if we see a later record for this job, that record will set the correct
399 ## qtime.
400 if not job.get('qtime'):
401 job.set('qtime',self.getfirst_event_time()-1)
402
403 line = inf.readline()
404
405 def get_evlist(self):
406 return self.__evlist__
407
408 class LogFileServer(Server):
409 def __init__(self,history,debug=0):
410
411 Server.__init__(self)
412
413 self.__history__ = history
414 # want to get first event on first getnextevent call, so set initial index
415 # to -1 (one previous to 0!)
416 self.__evindx__ = -1
417 first_event = history.get_evlist()[0]
418 if debug:
419 print 'first event data:'
420 print first_event.jobid(), first_event.type(), first_event.time()
421 starttime = first_event.time()
422 jobidlist = history.getjoblist()
423 jobidlist.sort()
424 if debug:
425 print jobidlist
426 for jid in jobidlist:
427 entry = history.getjob(jid)
428 if debug:
429 print jid, entry.get('qtime'), starttime
430 print entry.get('qtime') >= starttime
431 if entry.get('qtime') >= starttime : break # done
432
433 # we are guaranteed that the qtime of all jobs that make
434 # it this far are before the first event, so we need to
435 # figure if the job is queued or running, nothing else
436
437 job = copy.deepcopy(entry)
438 job.set('jobid',jid)
439 if job.get('start') and job.get('start') < starttime :
440 job.set('state','running')
441 else:
442 job.set('state','queued')
443
444 self.__jobdict__[jid] = job
445
446 ## getnextevent shows you what the next event will be. step actually
447 ## changes the server state to reflect what happened in that event.
448 ## you need the two since you want to calculate ETT for an event
449 ## BEFORE you actually submit the job to the queue!
450
451 def getnextevent(self): # return the next event
452 return self.__history__.getevent(self.__evindx__ + 1)
453 def step(self): # step to next event
454 self.__evindx__ = self.__evindx__ + 1
455 ev = self.__history__.getevent(self.__evindx__)
456 self.__evtime__ = ev.time()
457
458 # take care of implications for queue states
459
460 if ev.type() == 'Q' :
461 job = self.__history__.getjob(ev.jobid())
462 jobcopy = copy.deepcopy(job)
463 jobcopy.set('state','queued')
464 jobcopy.set('jobid',ev.jobid())
465 self.addjob(ev.jobid(),jobcopy)
466 elif ev.type() == 'S' :
467 job = self.getjob(ev.jobid())
468 job.set('state','running')
469 elif ev.type() == 'T' :
470 job = self.getjob(ev.jobid())
471 job.set('state','running')
472 job.set('start',ev.time())
473 elif ev.type() == 'E' :
474 self.deletejob(ev.jobid())
475 elif ev.type() == 'D' : # if it's the last delete evt seen, do it
476 jevtl = self.__history__.getjobevts(ev.jobid())
477 if jevtl[-1] == ev:
478 self.deletejob(ev.jobid())
479 return ev
480

Properties

Name Value
svn:eol-style native
svn:keywords Id URL

grid.support@nikhef.nl
ViewVC Help
Powered by ViewVC 1.1.28