1 |
#!/usr/bin/python2 |
2 |
# $Id$ |
3 |
# Source: $URL$ |
4 |
# J. A. Templon, NIKHEF/PDP 2011 |
5 |
|
6 |
# purpose: update rrd databases for ndpf group views |
7 |
# program runs qstat, parses the output to find number of waiting and running jobs per group |
8 |
# this is added to an RRD database |
9 |
|
10 |
DEBUG = 1 |
11 |
import os |
12 |
import tempfile |
13 |
qsfname = tempfile.mktemp(".txt","qsmf",os.environ['HOME']+"/tmp") |
14 |
|
15 |
TORQUE="korf.nikhef.nl" |
16 |
DATADIR=os.environ['HOME'] + '/ndpfdata/' |
17 |
WAITDB = DATADIR + 'ndpfwaiting.rrd' |
18 |
RUNDB = DATADIR + 'ndpfrunning.rrd' |
19 |
|
20 |
import time |
21 |
# old line for stro os.system('/usr/bin/qstat -f @' + TORQUE + ' > ' + qsfname) |
22 |
|
23 |
now = time.mktime(time.localtime()) |
24 |
os.system('/opt/torque4/bin/qstat-torque -f @' + TORQUE + ' > ' + qsfname) |
25 |
|
26 |
import torqueJobs |
27 |
jlist = torqueJobs.qs_parsefile(qsfname) |
28 |
|
29 |
os.system('mv ' + qsfname + ' ' + os.environ['HOME'] + '/tmp/qstat.last.txt') |
30 |
|
31 |
import torqueAttMappers as tam |
32 |
usedkeys = {'egroup' : 'varchar', 'jobid' : 'varchar', 'queue' : 'varchar', 'job_state' : 'varchar', |
33 |
'euser' : 'varchar', 'qtime' : 'float', 'start_time' : 'float', |
34 |
'comp_time' : 'float', 'exec_host' : 'integer' } |
35 |
amap = { |
36 |
'exec_host' : 'corect' |
37 |
} |
38 |
|
39 |
sqlitekeys = dict() |
40 |
for k in usedkeys.keys(): |
41 |
if k in amap.keys(): |
42 |
sqlitekeys[amap[k]] = usedkeys[k] |
43 |
else: |
44 |
sqlitekeys[k] = usedkeys[k] |
45 |
|
46 |
def mapatts(indict): |
47 |
sdict = tam.sub_dict(indict,usedkeys.keys()) |
48 |
odict = dict() |
49 |
changekeys = amap.keys() |
50 |
for k in sdict.keys(): |
51 |
if k in tam.tfl2 and sdict[k]: |
52 |
secs = tam.tconv(sdict[k]) |
53 |
sdict[k] = secs |
54 |
elif k == 'job_state': |
55 |
statelett = sdict[k] |
56 |
if statelett in ['Q','W']: |
57 |
sdict[k] = 'queued' |
58 |
elif statelett in ['R','E']: |
59 |
sdict[k] = 'running' |
60 |
elif k == 'exec_host' and sdict[k]: |
61 |
ehoststring = sdict[k] |
62 |
sdict[k] = ehoststring.count('+') + 1 |
63 |
if sdict[k]: |
64 |
if k in changekeys: |
65 |
odict[amap[k]] = sdict[k] |
66 |
else: |
67 |
odict[k] = sdict[k] |
68 |
return odict |
69 |
|
70 |
newjlist = list() |
71 |
for j in jlist: |
72 |
newjlist.append(mapatts(j)) |
73 |
|
74 |
import sqlite3 as sqlite |
75 |
cx = sqlite.connect(":memory:") |
76 |
cu = cx.cursor() |
77 |
|
78 |
# build table creation string |
79 |
|
80 |
crstr = 'create table jobs (' |
81 |
ctr = 0 |
82 |
for k in sqlitekeys.keys(): |
83 |
if ctr > 0 : crstr = crstr + ',' |
84 |
crstr = crstr + "%s %s" % (k, sqlitekeys[k]) ; ctr += 1 |
85 |
crstr += ')' |
86 |
|
87 |
# create the table |
88 |
|
89 |
cu.execute(crstr) |
90 |
|
91 |
# insert all the jobs found into the table |
92 |
|
93 |
for jd in newjlist: |
94 |
|
95 |
# build insert string |
96 |
kl = jd.keys() |
97 |
insstr = 'insert into jobs (' |
98 |
ctr = 0 |
99 |
for k in kl: |
100 |
if ctr > 0 : insstr += ',' |
101 |
insstr += k |
102 |
ctr += 1 |
103 |
insstr += ') values (' |
104 |
ctr = 0 |
105 |
for k in kl: |
106 |
if ctr > 0 : insstr += ',' |
107 |
insstr += repr(jd[k]) |
108 |
ctr += 1 |
109 |
insstr += ')' |
110 |
|
111 |
# print insstr |
112 |
cu.execute(insstr) |
113 |
|
114 |
# find the list of all queued jobs vs. group |
115 |
|
116 |
queuedjobs = dict() |
117 |
|
118 |
selstr = 'select egroup,count(*) from jobs where job_state="queued" group by egroup' |
119 |
cu.execute(selstr) |
120 |
rl = cu.fetchall() |
121 |
for r in rl: |
122 |
queuedjobs[r[0]] = r[1] |
123 |
|
124 |
qtot = sum(queuedjobs.values()) |
125 |
queuedjobs['total'] = qtot |
126 |
|
127 |
import rrdtool |
128 |
# loop over all known databases and update; with val if known, zero otherwise. |
129 |
import glob |
130 |
queued_files = glob.glob(DATADIR+'*.queued.rrd') |
131 |
for db in queued_files: |
132 |
group = db[len(DATADIR):db.find('.queued.rrd')] |
133 |
if group in queuedjobs.keys(): |
134 |
val = queuedjobs[group] |
135 |
else: |
136 |
val = 0 |
137 |
rrdtool.update(db, 'N:' + repr(val)) |
138 |
|
139 |
# do it again for the running jobs |
140 |
|
141 |
runningjobs = dict() |
142 |
selstr = 'select egroup,sum(corect) from jobs where job_state="running" group by egroup' |
143 |
cu.execute(selstr) |
144 |
rl = cu.fetchall() |
145 |
# debug |
146 |
# print selstr |
147 |
for r in rl: |
148 |
if r[1] == None: |
149 |
print "Check for jobs from group %s with no exec host" % r[0] |
150 |
else: |
151 |
runningjobs[r[0]] = r[1] |
152 |
|
153 |
rtot = sum(runningjobs.values()) |
154 |
totslots = int(open(DATADIR+'nslots').read()) |
155 |
if totslots < rtot: |
156 |
print "Total slots from nslots file lower than total running jobs" |
157 |
print totslots, "<", rtot |
158 |
print "Setting total slots to total running jobs" |
159 |
totslots = rtot |
160 |
offlineslots = int(open(DATADIR+'oslots').read()) |
161 |
runningjobs['offline'] = offlineslots |
162 |
runningjobs['unused'] = totslots - offlineslots - rtot |
163 |
runningjobs['total'] = totslots |
164 |
|
165 |
running_files = glob.glob(DATADIR+'*.running.rrd') |
166 |
for db in running_files: |
167 |
group = db[len(DATADIR):db.find('.running.rrd')] |
168 |
if group in runningjobs.keys(): |
169 |
val = runningjobs[group] |
170 |
else: |
171 |
val = 0 |
172 |
rrdtool.update(db, 'N:' + repr(val)) |
173 |
|
174 |
# now do longest wait times |
175 |
|
176 |
for group in queuedjobs.keys(): |
177 |
selstr = 'select qtime from jobs where job_state="queued" ' + \ |
178 |
'and egroup="' + group + '"' |
179 |
cu.execute(selstr) |
180 |
rl = cu.fetchall() |
181 |
if len(rl) > 0: |
182 |
qtl = list() |
183 |
for r in rl: |
184 |
qtl.append(r[0]) |
185 |
qtl.sort() |
186 |
waitt = now - qtl[0] |
187 |
else: |
188 |
waitt = 2 |
189 |
|
190 |
db = (DATADIR + group + '.waittime.rrd').encode('ascii') # UTF-8 gives rrdtool update err |
191 |
|
192 |
rrdtool.update(db, 'N:' + repr(int(waitt))) |
193 |
|
194 |
# rollover : avg core turnover in last 60 seconds; jobs completing! |
195 |
|
196 |
#selstr_dbg = 'SELECT jobid, comp_time, typeof(comp_time) from jobs' + \ |
197 |
# ' where comp_time <> "None"' |
198 |
#cu.execute(selstr_dbg) |
199 |
#rl = cu.fetchall() |
200 |
#print rl |
201 |
|
202 |
selstr = 'select comp_time from jobs where comp_time<>"None" ' + \ |
203 |
'and comp_time > ' + repr(now-60) |
204 |
|
205 |
cu.execute(selstr) |
206 |
rl = cu.fetchall() |
207 |
|
208 |
# print selstr |
209 |
# print rl |
210 |
|
211 |
if len(rl) > 0: |
212 |
stl = list() |
213 |
for r in rl: |
214 |
stl.append(r[0]) |
215 |
rollover = 60.0/len(stl) |
216 |
else: |
217 |
selstr = 'select comp_time from jobs where comp_time<>"None"' |
218 |
cu.execute(selstr) |
219 |
rl = cu.fetchall() |
220 |
if len(rl) > 1: |
221 |
stl = list() |
222 |
for r in rl: |
223 |
stl.append(r[0]) |
224 |
stl.sort() |
225 |
rollover = (now - stl[-2])/2. |
226 |
elif len(rl) == 1: |
227 |
rollover = now - rl[0][0] |
228 |
if DEBUG: print 'used time since last start' |
229 |
else: |
230 |
rollover = 600 # if nothing seen, all we know is, not in last ten minutes (keep_completed) |
231 |
if DEBUG: |
232 |
print 'set to 600, no comp_times seen' |
233 |
for tj in newjlist: |
234 |
print tj |
235 |
|
236 |
# print "len(stl), rollover:", len(stl), rollover |
237 |
rrdtool.update(DATADIR + "rollover" + '.waittime.rrd', |
238 |
'N:' + repr(rollover)) |
239 |
|
240 |
# lastroll : last recorded start in system |
241 |
|
242 |
selstr = 'select start_time from jobs where start_time<>"None"' |
243 |
cu.execute(selstr) |
244 |
rl = cu.fetchall() |
245 |
if len(rl) > 0: |
246 |
stl = list() |
247 |
for r in rl: |
248 |
stl.append(r[0]) |
249 |
|
250 |
stl.sort() |
251 |
lastroll = now - stl[-1] |
252 |
# DEBUG print "now, last, lastroll", now, stl[-1], lastroll |
253 |
if lastroll < 1: |
254 |
lastroll = 1 |
255 |
else: |
256 |
lastroll = 6000 # signal that no starts are recorded in system. |
257 |
|
258 |
rrdtool.update(DATADIR + "lastroll" + '.waittime.rrd', |
259 |
'N:' + repr(int(lastroll))) |
260 |
|
261 |
cx.close() |
262 |
|
263 |
### Local Variables: *** |
264 |
### mode: python *** |
265 |
### End: *** |
266 |
|