/[pdpsoft]/trunk/nl.nikhef.pdp.lrmsutils/lrms-pbs/pbsServer.py
ViewVC logotype

Contents of /trunk/nl.nikhef.pdp.lrmsutils/lrms-pbs/pbsServer.py

Parent Directory Parent Directory | Revision Log Revision Log


Revision 110 - (show annotations) (download) (as text)
Mon Mar 2 15:05:13 2009 UTC (13 years, 5 months ago) by templon
File MIME type: application/x-python
File size: 15643 byte(s)
remove support for pbs hostname arg.

1 #--
2 # pbsServer.py -- LRMS classes for PBS-based batch systems.
3 # $Id: pbsServer.py,v 1.9 2006/04/19 14:16:33 templon Exp $
4 #--
5
6 # Two classes derived from lrms.Server are included; one for
7 # working with a live PBS server, and one for representing historical
8 # states of the server from PBS accounting log files. A third class
9 # (History) is associated with the historical server class.
10
11 from lrms import * # base Server class, Job class
12 import commands
13 import grp
14 import pwd
15 import re
16 import string
17 import sys
18 import time
19
20 class LiveServer(Server):
21
22 def __init__(self,*arg,**kw):
23
24 Server.__init__(self)
25
26 cnow = time.ctime() # this hack is to get around time zone problems
27 if 'file' in kw.keys() and kw['file'] != None:
28 cmdstr = '/bin/cat ' + kw['file']
29 else:
30 cmdstr = 'qstat -f'
31 (stat, qstatout) = commands.getstatusoutput(cmdstr)
32 from torque_utils import pbsnodes
33 cpucount = 0
34 jobcount = 0
35 for node in pbsnodes():
36 if node.isUp():
37 cpucount += node.numCpu
38 for cpu in range(node.numCpu):
39 jobcount += len(node.jobs[cpu])
40 self.slotsUp = cpucount
41 self.slotsFree = cpucount - jobcount
42
43 nowtuple = time.strptime(cnow,"%c")
44 self.__evtime__ = int(time.mktime(nowtuple))
45
46 verbose_job_info=string.split(qstatout,"\n\n")
47 for j in verbose_job_info:
48 newj = Job()
49 lines = string.split(j,'\n ')
50 for l in lines:
51 thisline = string.strip(l)
52 if string.find(thisline,'Job Id') == 0:
53 newj.set('jobid',string.split(thisline)[2])
54 elif string.find(thisline,'Job_Owner') == 0:
55 user_and_host = string.split(thisline)[2]
56 user = string.split(user_and_host,'@')[0]
57 newj.set('user',user)
58 try:
59 thisgroup=pwd.getpwnam(user)[3]
60 groupname=grp.getgrgid(thisgroup)[0]
61 except:
62 thisgroup='unknown'
63 groupname='unknown'
64 newj.set('group',groupname)
65 elif string.find(thisline,'job_state') == 0:
66 statelett = string.split(thisline)[2]
67 if statelett in ['Q','W']:
68 val = 'queued'
69 elif statelett in ['R','E']:
70 val = 'running'
71 elif statelett in ['H', 'T']:
72 val = 'pending'
73 elif statelett == 'C':
74 val = 'done'
75 else:
76 val = 'unknown'
77 newj.set('state',val)
78 elif string.find(thisline,'queue =') == 0:
79 newj.set('queue',string.split(thisline)[2])
80 elif string.find(thisline,'qtime =') == 0:
81 timestring = string.split(thisline,'=')[1][1:]
82 timetuple = time.strptime(timestring,"%c")
83 newj.set('qtime',time.mktime(timetuple))
84 elif string.find(thisline,'Resource_List.walltime') == 0:
85 hms = string.split(thisline)[2]
86 t = string.split(hms,':')
87 secs = int(t[2]) + \
88 60.0*(int(t[1]) + 60*int(t[0]))
89 newj.set('maxwalltime',secs)
90 elif string.find(thisline,'resources_used.walltime') == 0:
91 hms = string.split(thisline)[2]
92 t = string.split(hms,':')
93 secs = int(t[2]) + \
94 60.0*(int(t[1]) + 60*int(t[0]))
95 start = self.__evtime__ - secs
96 newj.set('start',start)
97 elif thisline.find('Job_Name') == 0:
98 jnam = thisline.split()[2]
99 newj.set('name',jnam)
100 elif thisline.find('exec_host') == 0:
101 hlist = thisline.split('=')[1]
102 ncpu = hlist.count('+') + 1
103 newj.set('cpucount',ncpu)
104
105 self.addjob(newj.get('jobid'),newj)
106
107 import copy
108
109 ## following is helper for class Event.
110 ## takes as arg a string of key=val pairs, returns a dict with the same
111 ## structure. example input string:
112 ## user=tdykstra group=niktheorie jobname=Q11_241828.gjob
113
114 def keyval2dict(astring):
115 flds = string.split(astring)
116 d = {}
117 for f in flds:
118 kv=string.split(f,"=")
119 if len(kv) == 2:
120 d[kv[0]] = kv[1]
121 else:
122 raise CantHappenException
123 return d
124
125 class Event:
126
127 # simple class to represent events like job queued, job started, etc.
128
129 def __init__(self,evstring,debug=0):
130
131 # search pattern for parsing string using "re" module
132 # for successful search, fields are:
133 # 1) timestamp
134 # 2) event type (Q,S,E,D, etc)
135 # 3) local PBS jobID
136 # 4) rest of line (key=value) to be parsed otherwise
137
138 self.__time__ = None # default values
139 self.__type__ = None
140 self.__jobid__ = None
141 self.__info__ = { }
142
143 evpatt = "^(.+);([A-Z]);(.+);(.*)"
144 m = re.search(evpatt,evstring)
145 if not m:
146 print "parse patt failed, offending line is"
147 print evstring
148 return
149 if debug:
150 print "timestamp", m.group(1)
151 print "code", m.group(2)
152 print "jobid", m.group(3)
153 print "attrs", m.group(4)
154
155 # parse timestamp
156
157 ttup=time.strptime(m.group(1),"%m/%d/%Y %H:%M:%S")
158
159 # last element of time tuple is DST, but PBS log files
160 # don't specify time zone. Setting the last element of
161 # the tuple to -1 asks libC to figure it out based on
162 # local time zone of machine
163
164 atup = ttup[:8] + (-1,)
165 self.__time__ = int(time.mktime(atup))
166 self.__type__ = m.group(2)
167 self.__jobid__ = m.group(3)
168 self.__info__ = keyval2dict(m.group(4))
169
170 def time(self):
171 return self.__time__
172 def type(self):
173 return self.__type__
174 def jobid(self):
175 return self.__jobid__
176 def info(self,key=''):
177 if key == '':
178 return self.__info__
179 else:
180 if key in self.__info__.keys():
181 return self.__info__[key]
182 else:
183 return None
184 def __str__(self):
185 return self.__jobid__ + ' :: event type ' + self.__type__ + \
186 ' at ' + str(self.__time__)
187
188 class History:
189
190 # the idea here is to generate a couple of data structures:
191 # first the job catalog, which associates things like
192 # queue entry time, start time, stop time, owner username/groupname,
193 # etc. with each PBS jobid.
194 # second the event list, which records when jobs enter the various
195 # queues, start to execute, and terminate. This list only holds
196 # a timestamp, jobid, and event type. This seems to be the
197 # minimal amount of information we'll need to be able to generate
198 # the state of the queue at any given arbitrary time.
199
200 ## functions for using the job catalog. This beast is a dictionary
201 ## that has one entry (an object of class Job) per job seen in the
202 ## log files.
203
204 def addjob(self, jobid, job) : # add new job entry
205 if self.hasjob(jobid):
206 print "job already found, exiting"
207 sys.exit(1)
208 job.set('jobid',jobid)
209 self.__jobcat__[jobid] = job
210
211 def hasjob(self,jobid) : # test if job is already in catalogue
212 if jobid in self.__jobcat__.keys():
213 return 1
214 else:
215 return 0
216
217 def setjobinfo(self,jobid,key,val) : # add or set info to/for existing job
218 if not self.hasjob(jobid):
219 print "job not found:", jobid, key, val
220 return 0
221 self.__jobcat__[jobid].set(key,val)
222 return 1
223
224 def getjobinfo(self,jobid,key) : # get info for key from job jobid
225 if not self.hasjob(jobid):
226 print "job not found:", jobid, key
227 return 0
228 return self.__jobcat__[jobid].get(key)
229
230 def getjoblist(self):
231 return self.__jobcat__.keys()
232
233 def getjob(self,jobid):
234 return self.__jobcat__[jobid]
235
236 ## functions for using event list. This beast is just a list (in sequence)
237 ## of all events seen while parsing the log files
238
239 def getfirst_event_time(self):
240 return self.__evlist__[0].time()
241
242 def getevent(self,index):
243 return self.__evlist__[index]
244
245 ## functions for using the job event list. This beast is just a dictionary
246 ## (key=jobid), so only entry in dict for each job; the value for each
247 ## jobid is a list of events seen for this job (in sequence). Rationale
248 ## for this object is it can help in resolving ambiguous cases (like
249 ## multiple "D" events seen).
250
251 def getjobevts(self,jobid):
252 if jobid not in self.__jobevs__.keys():
253 print 'getjobevs: jobid', jobid, 'not found in job_event db'
254 sys.exit(1)
255 return self.__jobevs__[jobid]
256
257 def addevt(self,ev):
258 ## append to event list
259 self.__evlist__.append(ev)
260 ## append to job event struct; create entry for job if needed
261 if ev.jobid() not in self.__jobevs__.keys():
262 self.__jobevs__[ev.jobid()] = [ev]
263 else:
264 self.__jobevs__[ev.jobid()].append(ev)
265
266 def __init__(self,logfilelist):
267
268 self.__evlist__ = [ ]
269 self.__jobcat__ = { }
270 self.__jobevs__ = { }
271
272 for f in logfilelist:
273 inf = open(f,'r')
274 line = inf.readline()
275 while line:
276 line = line[:-1] # chop off trailing newline
277 ev = Event(line) # parse line, create event
278 self.addevt(ev) # adds event to evlist and jobevt struct
279 ## vars for convenience in typing:
280 if ev.type() == 'Q': # job enters sys for 1st time
281 if self.hasjob(ev.jobid()): # should not happen
282 print 'Error, Q event seen for pre-existing job', \
283 ev.jobid()
284 sys.exit(1)
285 newentry = Job() # make new entry, fill info
286 newentry.set('qtime',ev.time())
287 newentry.set('queue',ev.info('queue'))
288 self.addjob(ev.jobid(),newentry)
289 else:
290 ## for all other event types
291 if not self.hasjob(ev.jobid()):
292 newentry = Job() # make new entry
293 self.addjob(ev.jobid(),newentry)
294 job = self.getjob(ev.jobid())
295 if ev.info('qtime'):
296 job.set('qtime',int(ev.info('qtime')))
297 if ev.info('queue') and not job.get('queue'):
298 job.set('queue',ev.info('queue'))
299 if ev.info('user') and not job.get('user'):
300 job.set('user',ev.info('user'))
301 if ev.info('group') and not job.get('group'):
302 job.set('group',ev.info('group'))
303 if ev.info('start') and not job.get('start'):
304 job.set('start',int(ev.info('start')))
305 if ev.info('Resource_List.walltime') and not \
306 job.get('maxwalltime'):
307 hms = ev.info('Resource_List.walltime')
308 hmsflds = string.split(hms,":")
309 maxwallsecs = int(hmsflds[0]) * 3600 + \
310 int(hmsflds[1]) * 60 + \
311 int(hmsflds[2])
312 job.set('maxwalltime',maxwallsecs)
313 if ev.info('end') and not job.get('end'):
314 job.set('end',int(ev.info('end')))
315
316 ## special handling for troublesome event types
317
318 if ev.type() == 'T': # job restart after checkpoint
319 ## previous job start record may not have been seen; if not, we don't
320 ## know if it was running or queued before, so just set current event time
321 ## as start since we know it's running now.
322 if not job.get('start'):
323 job.set('start',ev.time())
324 ## in some cases the T record may be the first (or even only) record
325 ## we see. in that case, the only thing we can say is that the qtime
326 ## was before the beginning of the log files ... set qtime like that.
327 ## if we see a later record for this job, that record will set the correct
328 ## qtime.
329 if not job.get('qtime'):
330 job.set('qtime',self.getfirst_event_time()-1)
331
332 line = inf.readline()
333
334 def get_evlist(self):
335 return self.__evlist__
336
337 class LogFileServer(Server):
338 def __init__(self,history,debug=0):
339
340 Server.__init__(self)
341
342 self.__history__ = history
343 # want to get first event on first getnextevent call, so set initial index
344 # to -1 (one previous to 0!)
345 self.__evindx__ = -1
346 first_event = history.get_evlist()[0]
347 if debug:
348 print 'first event data:'
349 print first_event.jobid(), first_event.type(), first_event.time()
350 starttime = first_event.time()
351 jobidlist = history.getjoblist()
352 jobidlist.sort()
353 if debug:
354 print jobidlist
355 for jid in jobidlist:
356 entry = history.getjob(jid)
357 if debug:
358 print jid, entry.get('qtime'), starttime
359 print entry.get('qtime') >= starttime
360 if entry.get('qtime') >= starttime : break # done
361
362 # we are guaranteed that the qtime of all jobs that make
363 # it this far are before the first event, so we need to
364 # figure if the job is queued or running, nothing else
365
366 job = copy.deepcopy(entry)
367 job.set('jobid',jid)
368 if job.get('start') and job.get('start') < starttime :
369 job.set('state','running')
370 else:
371 job.set('state','queued')
372
373 self.__jobdict__[jid] = job
374
375 ## getnextevent shows you what the next event will be. step actually
376 ## changes the server state to reflect what happened in that event.
377 ## you need the two since you want to calculate ETT for an event
378 ## BEFORE you actually submit the job to the queue!
379
380 def getnextevent(self): # return the next event
381 return self.__history__.getevent(self.__evindx__ + 1)
382 def step(self): # step to next event
383 self.__evindx__ = self.__evindx__ + 1
384 ev = self.__history__.getevent(self.__evindx__)
385 self.__evtime__ = ev.time()
386
387 # take care of implications for queue states
388
389 if ev.type() == 'Q' :
390 job = self.__history__.getjob(ev.jobid())
391 jobcopy = copy.deepcopy(job)
392 jobcopy.set('state','queued')
393 jobcopy.set('jobid',ev.jobid())
394 self.addjob(ev.jobid(),jobcopy)
395 elif ev.type() == 'S' :
396 job = self.getjob(ev.jobid())
397 job.set('state','running')
398 elif ev.type() == 'T' :
399 job = self.getjob(ev.jobid())
400 job.set('state','running')
401 job.set('start',ev.time())
402 elif ev.type() == 'E' :
403 self.deletejob(ev.jobid())
404 elif ev.type() == 'D' : # if it's the last delete evt seen, do it
405 jevtl = self.__history__.getjobevts(ev.jobid())
406 if jevtl[-1] == ev:
407 self.deletejob(ev.jobid())
408 return ev
409

grid.support@nikhef.nl
ViewVC Help
Powered by ViewVC 1.1.28