1 |
#-- |
2 |
# pbsServer.py -- LRMS classes for PBS-based batch systems. |
3 |
# $Id: pbsServer.py,v 1.9 2006/04/19 14:16:33 templon Exp $ |
4 |
#-- |
5 |
|
6 |
# Two classes derived from lrms.Server are included; one for |
7 |
# working with a live PBS server, and one for representing historical |
8 |
# states of the server from PBS accounting log files. A third class |
9 |
# (History) is associated with the historical server class. |
10 |
|
11 |
from lrms import * # base Server class, Job class |
12 |
import commands |
13 |
import grp |
14 |
import pwd |
15 |
import re |
16 |
import string |
17 |
import sys |
18 |
import time |
19 |
|
20 |
class LiveServer(Server): |
21 |
|
22 |
def __init__(self,*arg,**kw): |
23 |
|
24 |
Server.__init__(self) |
25 |
|
26 |
cnow = time.ctime() # this hack is to get around time zone problems |
27 |
if 'file' in kw.keys() and kw['file'] != None: |
28 |
cmdstr = '/bin/cat ' + kw['file'] |
29 |
else: |
30 |
cmdstr = 'qstat -f' |
31 |
(stat, qstatout) = commands.getstatusoutput(cmdstr) |
32 |
from torque_utils import pbsnodes |
33 |
cpucount = 0 |
34 |
jobcount = 0 |
35 |
for node in pbsnodes(): |
36 |
if node.isUp(): |
37 |
cpucount += node.numCpu |
38 |
for cpu in range(node.numCpu): |
39 |
jobcount += len(node.jobs[cpu]) |
40 |
self.slotsUp = cpucount |
41 |
self.slotsFree = cpucount - jobcount |
42 |
|
43 |
nowtuple = time.strptime(cnow,"%c") |
44 |
self.__evtime__ = int(time.mktime(nowtuple)) |
45 |
|
46 |
verbose_job_info=string.split(qstatout,"\n\n") |
47 |
for j in verbose_job_info: |
48 |
newj = Job() |
49 |
lines = string.split(j,'\n ') |
50 |
for l in lines: |
51 |
thisline = string.strip(l) |
52 |
if string.find(thisline,'Job Id') == 0: |
53 |
newj.set('jobid',string.split(thisline)[2]) |
54 |
elif string.find(thisline,'Job_Owner') == 0: |
55 |
user_and_host = string.split(thisline)[2] |
56 |
user = string.split(user_and_host,'@')[0] |
57 |
newj.set('user',user) |
58 |
try: |
59 |
thisgroup=pwd.getpwnam(user)[3] |
60 |
groupname=grp.getgrgid(thisgroup)[0] |
61 |
except: |
62 |
thisgroup='unknown' |
63 |
groupname='unknown' |
64 |
newj.set('group',groupname) |
65 |
elif string.find(thisline,'job_state') == 0: |
66 |
statelett = string.split(thisline)[2] |
67 |
if statelett in ['Q','W']: |
68 |
val = 'queued' |
69 |
elif statelett in ['R','E']: |
70 |
val = 'running' |
71 |
elif statelett in ['H', 'T']: |
72 |
val = 'pending' |
73 |
elif statelett == 'C': |
74 |
val = 'done' |
75 |
else: |
76 |
val = 'unknown' |
77 |
newj.set('state',val) |
78 |
elif string.find(thisline,'queue =') == 0: |
79 |
newj.set('queue',string.split(thisline)[2]) |
80 |
elif string.find(thisline,'qtime =') == 0: |
81 |
timestring = string.split(thisline,'=')[1][1:] |
82 |
timetuple = time.strptime(timestring,"%c") |
83 |
newj.set('qtime',time.mktime(timetuple)) |
84 |
elif string.find(thisline,'Resource_List.walltime') == 0: |
85 |
hms = string.split(thisline)[2] |
86 |
t = string.split(hms,':') |
87 |
secs = int(t[2]) + \ |
88 |
60.0*(int(t[1]) + 60*int(t[0])) |
89 |
newj.set('maxwalltime',secs) |
90 |
elif string.find(thisline,'resources_used.walltime') == 0: |
91 |
hms = string.split(thisline)[2] |
92 |
t = string.split(hms,':') |
93 |
secs = int(t[2]) + \ |
94 |
60.0*(int(t[1]) + 60*int(t[0])) |
95 |
start = self.__evtime__ - secs |
96 |
newj.set('start',start) |
97 |
elif thisline.find('Job_Name') == 0: |
98 |
jnam = thisline.split()[2] |
99 |
newj.set('name',jnam) |
100 |
elif thisline.find('exec_host') == 0: |
101 |
hlist = thisline.split('=')[1] |
102 |
ncpu = hlist.count('+') + 1 |
103 |
newj.set('cpucount',ncpu) |
104 |
|
105 |
self.addjob(newj.get('jobid'),newj) |
106 |
|
107 |
import copy |
108 |
|
109 |
## following is helper for class Event. |
110 |
## takes as arg a string of key=val pairs, returns a dict with the same |
111 |
## structure. example input string: |
112 |
## user=tdykstra group=niktheorie jobname=Q11_241828.gjob |
113 |
|
114 |
def keyval2dict(astring): |
115 |
flds = string.split(astring) |
116 |
d = {} |
117 |
for f in flds: |
118 |
kv=string.split(f,"=") |
119 |
if len(kv) == 2: |
120 |
d[kv[0]] = kv[1] |
121 |
else: |
122 |
raise CantHappenException |
123 |
return d |
124 |
|
125 |
class Event: |
126 |
|
127 |
# simple class to represent events like job queued, job started, etc. |
128 |
|
129 |
def __init__(self,evstring,debug=0): |
130 |
|
131 |
# search pattern for parsing string using "re" module |
132 |
# for successful search, fields are: |
133 |
# 1) timestamp |
134 |
# 2) event type (Q,S,E,D, etc) |
135 |
# 3) local PBS jobID |
136 |
# 4) rest of line (key=value) to be parsed otherwise |
137 |
|
138 |
self.__time__ = None # default values |
139 |
self.__type__ = None |
140 |
self.__jobid__ = None |
141 |
self.__info__ = { } |
142 |
|
143 |
evpatt = "^(.+);([A-Z]);(.+);(.*)" |
144 |
m = re.search(evpatt,evstring) |
145 |
if not m: |
146 |
print "parse patt failed, offending line is" |
147 |
print evstring |
148 |
return |
149 |
if debug: |
150 |
print "timestamp", m.group(1) |
151 |
print "code", m.group(2) |
152 |
print "jobid", m.group(3) |
153 |
print "attrs", m.group(4) |
154 |
|
155 |
# parse timestamp |
156 |
|
157 |
ttup=time.strptime(m.group(1),"%m/%d/%Y %H:%M:%S") |
158 |
|
159 |
# last element of time tuple is DST, but PBS log files |
160 |
# don't specify time zone. Setting the last element of |
161 |
# the tuple to -1 asks libC to figure it out based on |
162 |
# local time zone of machine |
163 |
|
164 |
atup = ttup[:8] + (-1,) |
165 |
self.__time__ = int(time.mktime(atup)) |
166 |
self.__type__ = m.group(2) |
167 |
self.__jobid__ = m.group(3) |
168 |
self.__info__ = keyval2dict(m.group(4)) |
169 |
|
170 |
def time(self): |
171 |
return self.__time__ |
172 |
def type(self): |
173 |
return self.__type__ |
174 |
def jobid(self): |
175 |
return self.__jobid__ |
176 |
def info(self,key=''): |
177 |
if key == '': |
178 |
return self.__info__ |
179 |
else: |
180 |
if key in self.__info__.keys(): |
181 |
return self.__info__[key] |
182 |
else: |
183 |
return None |
184 |
def __str__(self): |
185 |
return self.__jobid__ + ' :: event type ' + self.__type__ + \ |
186 |
' at ' + str(self.__time__) |
187 |
|
188 |
class History: |
189 |
|
190 |
# the idea here is to generate a couple of data structures: |
191 |
# first the job catalog, which associates things like |
192 |
# queue entry time, start time, stop time, owner username/groupname, |
193 |
# etc. with each PBS jobid. |
194 |
# second the event list, which records when jobs enter the various |
195 |
# queues, start to execute, and terminate. This list only holds |
196 |
# a timestamp, jobid, and event type. This seems to be the |
197 |
# minimal amount of information we'll need to be able to generate |
198 |
# the state of the queue at any given arbitrary time. |
199 |
|
200 |
## functions for using the job catalog. This beast is a dictionary |
201 |
## that has one entry (an object of class Job) per job seen in the |
202 |
## log files. |
203 |
|
204 |
def addjob(self, jobid, job) : # add new job entry |
205 |
if self.hasjob(jobid): |
206 |
print "job already found, exiting" |
207 |
sys.exit(1) |
208 |
job.set('jobid',jobid) |
209 |
self.__jobcat__[jobid] = job |
210 |
|
211 |
def hasjob(self,jobid) : # test if job is already in catalogue |
212 |
if jobid in self.__jobcat__.keys(): |
213 |
return 1 |
214 |
else: |
215 |
return 0 |
216 |
|
217 |
def setjobinfo(self,jobid,key,val) : # add or set info to/for existing job |
218 |
if not self.hasjob(jobid): |
219 |
print "job not found:", jobid, key, val |
220 |
return 0 |
221 |
self.__jobcat__[jobid].set(key,val) |
222 |
return 1 |
223 |
|
224 |
def getjobinfo(self,jobid,key) : # get info for key from job jobid |
225 |
if not self.hasjob(jobid): |
226 |
print "job not found:", jobid, key |
227 |
return 0 |
228 |
return self.__jobcat__[jobid].get(key) |
229 |
|
230 |
def getjoblist(self): |
231 |
return self.__jobcat__.keys() |
232 |
|
233 |
def getjob(self,jobid): |
234 |
return self.__jobcat__[jobid] |
235 |
|
236 |
## functions for using event list. This beast is just a list (in sequence) |
237 |
## of all events seen while parsing the log files |
238 |
|
239 |
def getfirst_event_time(self): |
240 |
return self.__evlist__[0].time() |
241 |
|
242 |
def getevent(self,index): |
243 |
return self.__evlist__[index] |
244 |
|
245 |
## functions for using the job event list. This beast is just a dictionary |
246 |
## (key=jobid), so only entry in dict for each job; the value for each |
247 |
## jobid is a list of events seen for this job (in sequence). Rationale |
248 |
## for this object is it can help in resolving ambiguous cases (like |
249 |
## multiple "D" events seen). |
250 |
|
251 |
def getjobevts(self,jobid): |
252 |
if jobid not in self.__jobevs__.keys(): |
253 |
print 'getjobevs: jobid', jobid, 'not found in job_event db' |
254 |
sys.exit(1) |
255 |
return self.__jobevs__[jobid] |
256 |
|
257 |
def addevt(self,ev): |
258 |
## append to event list |
259 |
self.__evlist__.append(ev) |
260 |
## append to job event struct; create entry for job if needed |
261 |
if ev.jobid() not in self.__jobevs__.keys(): |
262 |
self.__jobevs__[ev.jobid()] = [ev] |
263 |
else: |
264 |
self.__jobevs__[ev.jobid()].append(ev) |
265 |
|
266 |
def __init__(self,logfilelist): |
267 |
|
268 |
self.__evlist__ = [ ] |
269 |
self.__jobcat__ = { } |
270 |
self.__jobevs__ = { } |
271 |
|
272 |
for f in logfilelist: |
273 |
inf = open(f,'r') |
274 |
line = inf.readline() |
275 |
while line: |
276 |
line = line[:-1] # chop off trailing newline |
277 |
ev = Event(line) # parse line, create event |
278 |
self.addevt(ev) # adds event to evlist and jobevt struct |
279 |
## vars for convenience in typing: |
280 |
if ev.type() == 'Q': # job enters sys for 1st time |
281 |
if self.hasjob(ev.jobid()): # should not happen |
282 |
print 'Error, Q event seen for pre-existing job', \ |
283 |
ev.jobid() |
284 |
sys.exit(1) |
285 |
newentry = Job() # make new entry, fill info |
286 |
newentry.set('qtime',ev.time()) |
287 |
newentry.set('queue',ev.info('queue')) |
288 |
self.addjob(ev.jobid(),newentry) |
289 |
else: |
290 |
## for all other event types |
291 |
if not self.hasjob(ev.jobid()): |
292 |
newentry = Job() # make new entry |
293 |
self.addjob(ev.jobid(),newentry) |
294 |
job = self.getjob(ev.jobid()) |
295 |
if ev.info('qtime'): |
296 |
job.set('qtime',int(ev.info('qtime'))) |
297 |
if ev.info('queue') and not job.get('queue'): |
298 |
job.set('queue',ev.info('queue')) |
299 |
if ev.info('user') and not job.get('user'): |
300 |
job.set('user',ev.info('user')) |
301 |
if ev.info('group') and not job.get('group'): |
302 |
job.set('group',ev.info('group')) |
303 |
if ev.info('start') and not job.get('start'): |
304 |
job.set('start',int(ev.info('start'))) |
305 |
if ev.info('Resource_List.walltime') and not \ |
306 |
job.get('maxwalltime'): |
307 |
hms = ev.info('Resource_List.walltime') |
308 |
hmsflds = string.split(hms,":") |
309 |
maxwallsecs = int(hmsflds[0]) * 3600 + \ |
310 |
int(hmsflds[1]) * 60 + \ |
311 |
int(hmsflds[2]) |
312 |
job.set('maxwalltime',maxwallsecs) |
313 |
if ev.info('end') and not job.get('end'): |
314 |
job.set('end',int(ev.info('end'))) |
315 |
|
316 |
## special handling for troublesome event types |
317 |
|
318 |
if ev.type() == 'T': # job restart after checkpoint |
319 |
## previous job start record may not have been seen; if not, we don't |
320 |
## know if it was running or queued before, so just set current event time |
321 |
## as start since we know it's running now. |
322 |
if not job.get('start'): |
323 |
job.set('start',ev.time()) |
324 |
## in some cases the T record may be the first (or even only) record |
325 |
## we see. in that case, the only thing we can say is that the qtime |
326 |
## was before the beginning of the log files ... set qtime like that. |
327 |
## if we see a later record for this job, that record will set the correct |
328 |
## qtime. |
329 |
if not job.get('qtime'): |
330 |
job.set('qtime',self.getfirst_event_time()-1) |
331 |
|
332 |
line = inf.readline() |
333 |
|
334 |
def get_evlist(self): |
335 |
return self.__evlist__ |
336 |
|
337 |
class LogFileServer(Server): |
338 |
def __init__(self,history,debug=0): |
339 |
|
340 |
Server.__init__(self) |
341 |
|
342 |
self.__history__ = history |
343 |
# want to get first event on first getnextevent call, so set initial index |
344 |
# to -1 (one previous to 0!) |
345 |
self.__evindx__ = -1 |
346 |
first_event = history.get_evlist()[0] |
347 |
if debug: |
348 |
print 'first event data:' |
349 |
print first_event.jobid(), first_event.type(), first_event.time() |
350 |
starttime = first_event.time() |
351 |
jobidlist = history.getjoblist() |
352 |
jobidlist.sort() |
353 |
if debug: |
354 |
print jobidlist |
355 |
for jid in jobidlist: |
356 |
entry = history.getjob(jid) |
357 |
if debug: |
358 |
print jid, entry.get('qtime'), starttime |
359 |
print entry.get('qtime') >= starttime |
360 |
if entry.get('qtime') >= starttime : break # done |
361 |
|
362 |
# we are guaranteed that the qtime of all jobs that make |
363 |
# it this far are before the first event, so we need to |
364 |
# figure if the job is queued or running, nothing else |
365 |
|
366 |
job = copy.deepcopy(entry) |
367 |
job.set('jobid',jid) |
368 |
if job.get('start') and job.get('start') < starttime : |
369 |
job.set('state','running') |
370 |
else: |
371 |
job.set('state','queued') |
372 |
|
373 |
self.__jobdict__[jid] = job |
374 |
|
375 |
## getnextevent shows you what the next event will be. step actually |
376 |
## changes the server state to reflect what happened in that event. |
377 |
## you need the two since you want to calculate ETT for an event |
378 |
## BEFORE you actually submit the job to the queue! |
379 |
|
380 |
def getnextevent(self): # return the next event |
381 |
return self.__history__.getevent(self.__evindx__ + 1) |
382 |
def step(self): # step to next event |
383 |
self.__evindx__ = self.__evindx__ + 1 |
384 |
ev = self.__history__.getevent(self.__evindx__) |
385 |
self.__evtime__ = ev.time() |
386 |
|
387 |
# take care of implications for queue states |
388 |
|
389 |
if ev.type() == 'Q' : |
390 |
job = self.__history__.getjob(ev.jobid()) |
391 |
jobcopy = copy.deepcopy(job) |
392 |
jobcopy.set('state','queued') |
393 |
jobcopy.set('jobid',ev.jobid()) |
394 |
self.addjob(ev.jobid(),jobcopy) |
395 |
elif ev.type() == 'S' : |
396 |
job = self.getjob(ev.jobid()) |
397 |
job.set('state','running') |
398 |
elif ev.type() == 'T' : |
399 |
job = self.getjob(ev.jobid()) |
400 |
job.set('state','running') |
401 |
job.set('start',ev.time()) |
402 |
elif ev.type() == 'E' : |
403 |
self.deletejob(ev.jobid()) |
404 |
elif ev.type() == 'D' : # if it's the last delete evt seen, do it |
405 |
jevtl = self.__history__.getjobevts(ev.jobid()) |
406 |
if jevtl[-1] == ev: |
407 |
self.deletejob(ev.jobid()) |
408 |
return ev |
409 |
|