1 |
#!/usr/bin/python2 |
2 |
# $Id$ |
3 |
# Source: $URL$ |
4 |
# J. A. Templon, NIKHEF/PDP 2011 |
5 |
|
6 |
# purpose: update rrd databases for ndpf group views |
7 |
# program runs qstat, parses the output to find number of waiting and running jobs per group |
8 |
# this is added to an RRD database |
9 |
|
10 |
DEBUG = 1 |
11 |
import os |
12 |
import tempfile |
13 |
qsfname = tempfile.mktemp(".txt","qsmf",os.environ['HOME']+"/tmp") |
14 |
|
15 |
TORQUE="stro.nikhef.nl" |
16 |
DATADIR=os.environ['HOME'] + '/ndpfdata/' |
17 |
WAITDB = DATADIR + 'ndpfwaiting.rrd' |
18 |
RUNDB = DATADIR + 'ndpfrunning.rrd' |
19 |
|
20 |
import time |
21 |
os.system('/usr/bin/qstat -f @' + TORQUE + ' > ' + qsfname) |
22 |
now = time.mktime(time.localtime()) |
23 |
|
24 |
import torqueJobs |
25 |
jlist = torqueJobs.qs_parsefile(qsfname) |
26 |
|
27 |
os.system('mv ' + qsfname + ' ' + os.environ['HOME'] + '/tmp/qstat.last.txt') |
28 |
|
29 |
import torqueAttMappers as tam |
30 |
usedkeys = {'egroup' : 'varchar', 'jobid' : 'varchar', 'queue' : 'varchar', 'job_state' : 'varchar', |
31 |
'euser' : 'varchar', 'qtime' : 'float', 'start_time' : 'float', |
32 |
'comp_time' : 'float', 'exec_host' : 'integer' } |
33 |
amap = { |
34 |
'exec_host' : 'corect' |
35 |
} |
36 |
|
37 |
sqlitekeys = dict() |
38 |
for k in usedkeys.keys(): |
39 |
if k in amap.keys(): |
40 |
sqlitekeys[amap[k]] = usedkeys[k] |
41 |
else: |
42 |
sqlitekeys[k] = usedkeys[k] |
43 |
|
44 |
def mapatts(indict): |
45 |
sdict = tam.sub_dict(indict,usedkeys.keys()) |
46 |
odict = dict() |
47 |
changekeys = amap.keys() |
48 |
for k in sdict.keys(): |
49 |
if k in tam.tfl2 and sdict[k]: |
50 |
secs = tam.tconv(sdict[k]) |
51 |
sdict[k] = secs |
52 |
elif k == 'job_state': |
53 |
statelett = sdict[k] |
54 |
if statelett in ['Q','W']: |
55 |
sdict[k] = 'queued' |
56 |
elif statelett in ['R','E']: |
57 |
sdict[k] = 'running' |
58 |
elif k == 'exec_host' and sdict[k]: |
59 |
ehoststring = sdict[k] |
60 |
sdict[k] = ehoststring.count('+') + 1 |
61 |
if sdict[k]: |
62 |
if k in changekeys: |
63 |
odict[amap[k]] = sdict[k] |
64 |
else: |
65 |
odict[k] = sdict[k] |
66 |
return odict |
67 |
|
68 |
newjlist = list() |
69 |
for j in jlist: |
70 |
newjlist.append(mapatts(j)) |
71 |
|
72 |
import sqlite |
73 |
cx = sqlite.connect(":memory:") |
74 |
cu = cx.cursor() |
75 |
|
76 |
# build table creation string |
77 |
|
78 |
crstr = 'create table jobs (' |
79 |
ctr = 0 |
80 |
for k in sqlitekeys.keys(): |
81 |
if ctr > 0 : crstr = crstr + ',' |
82 |
crstr = crstr + "%s %s" % (k, sqlitekeys[k]) ; ctr += 1 |
83 |
crstr += ')' |
84 |
|
85 |
# create the table |
86 |
|
87 |
cu.execute(crstr) |
88 |
|
89 |
# insert all the jobs found into the table |
90 |
|
91 |
for jd in newjlist: |
92 |
|
93 |
# build insert string |
94 |
kl = jd.keys() |
95 |
insstr = 'insert into jobs (' |
96 |
ctr = 0 |
97 |
for k in kl: |
98 |
if ctr > 0 : insstr += ',' |
99 |
insstr += k |
100 |
ctr += 1 |
101 |
insstr += ') values (' |
102 |
ctr = 0 |
103 |
for k in kl: |
104 |
if ctr > 0 : insstr += ',' |
105 |
insstr += repr(jd[k]) |
106 |
ctr += 1 |
107 |
insstr += ')' |
108 |
|
109 |
cu.execute(insstr) |
110 |
|
111 |
# find the list of all queued jobs vs. group |
112 |
|
113 |
queuedjobs = dict() |
114 |
|
115 |
selstr = 'select egroup,count(*) from jobs where job_state="queued" group by egroup' |
116 |
cu.execute(selstr) |
117 |
rl = cu.fetchall() |
118 |
for r in rl: |
119 |
queuedjobs[r[0]] = r[1] |
120 |
|
121 |
qtot = sum(queuedjobs.values()) |
122 |
queuedjobs['total'] = qtot |
123 |
|
124 |
import rrdtool |
125 |
# loop over all known databases and update; with val if known, zero otherwise. |
126 |
import glob |
127 |
queued_files = glob.glob(DATADIR+'*.queued.rrd') |
128 |
for db in queued_files: |
129 |
group = db[len(DATADIR):db.find('.queued.rrd')] |
130 |
if group in queuedjobs.keys(): |
131 |
val = queuedjobs[group] |
132 |
else: |
133 |
val = 0 |
134 |
rrdtool.update(db, 'N:' + repr(val)) |
135 |
|
136 |
# do it again for the running jobs |
137 |
|
138 |
runningjobs = dict() |
139 |
selstr = 'select egroup,sum(corect) from jobs where job_state="running" group by egroup' |
140 |
cu.execute(selstr) |
141 |
rl = cu.fetchall() |
142 |
for r in rl: |
143 |
runningjobs[r[0]] = r[1] |
144 |
|
145 |
rtot = sum(runningjobs.values()) |
146 |
totslots = int(open(DATADIR+'nslots').read()) |
147 |
if totslots < rtot: |
148 |
print "Total slots from nslots file lower than total running jobs" |
149 |
print totslots, "<", rtot |
150 |
print "Setting total slots to total running jobs" |
151 |
totslots = rtot |
152 |
offlineslots = int(open(DATADIR+'oslots').read()) |
153 |
runningjobs['offline'] = offlineslots |
154 |
runningjobs['unused'] = totslots - offlineslots - rtot |
155 |
runningjobs['total'] = totslots |
156 |
|
157 |
running_files = glob.glob(DATADIR+'*.running.rrd') |
158 |
for db in running_files: |
159 |
group = db[len(DATADIR):db.find('.running.rrd')] |
160 |
if group in runningjobs.keys(): |
161 |
val = runningjobs[group] |
162 |
else: |
163 |
val = 0 |
164 |
rrdtool.update(db, 'N:' + repr(val)) |
165 |
|
166 |
# now do longest wait times |
167 |
|
168 |
for group in queuedjobs.keys(): |
169 |
selstr = 'select qtime from jobs where job_state="queued" ' + \ |
170 |
'and egroup="' + group + '"' |
171 |
cu.execute(selstr) |
172 |
rl = cu.fetchall() |
173 |
if len(rl) > 0: |
174 |
qtl = list() |
175 |
for r in rl: |
176 |
qtl.append(r[0]) |
177 |
qtl.sort() |
178 |
waitt = now - qtl[0] |
179 |
else: |
180 |
waitt = 2 |
181 |
|
182 |
rrdtool.update(DATADIR + group + '.waittime.rrd', |
183 |
'N:' + repr(int(waitt))) |
184 |
|
185 |
# rollover : avg core turnover in last 60 seconds; jobs completing! |
186 |
|
187 |
selstr = 'select comp_time from jobs where comp_time<>"None" and comp_time > (' +\ |
188 |
repr(now) + ' - 60)' |
189 |
cu.execute(selstr) |
190 |
rl = cu.fetchall() |
191 |
if len(rl) > 0: |
192 |
stl = list() |
193 |
for r in rl: |
194 |
stl.append(r[0]) |
195 |
rollover = 60.0/len(stl) |
196 |
else: |
197 |
selstr = 'select comp_time from jobs where comp_time<>"None"' |
198 |
cu.execute(selstr) |
199 |
rl = cu.fetchall() |
200 |
if len(rl) > 1: |
201 |
stl = list() |
202 |
for r in rl: |
203 |
stl.append(r[0]) |
204 |
stl.sort() |
205 |
rollover = (now - stl[-2])/2. |
206 |
elif len(rl) == 1: |
207 |
rollover = now - rl[0][0] |
208 |
if DEBUG: print 'used time since last start' |
209 |
else: |
210 |
rollover = 600 # if nothing seen, all we know is, not in last ten minutes (keep_completed) |
211 |
if DEBUG: |
212 |
print 'set to 600, no comp_times seen' |
213 |
for tj in newjlist: |
214 |
print tj |
215 |
|
216 |
# print "len(stl), rollover:", len(stl), rollover |
217 |
rrdtool.update(DATADIR + "rollover" + '.waittime.rrd', |
218 |
'N:' + repr(rollover)) |
219 |
|
220 |
# lastroll : last recorded start in system |
221 |
|
222 |
selstr = 'select start_time from jobs where start_time<>"None"' |
223 |
cu.execute(selstr) |
224 |
rl = cu.fetchall() |
225 |
if len(rl) > 0: |
226 |
stl = list() |
227 |
for r in rl: |
228 |
stl.append(r[0]) |
229 |
|
230 |
stl.sort() |
231 |
lastroll = now - stl[-1] |
232 |
if lastroll < 1: |
233 |
lastroll = 1 |
234 |
else: |
235 |
lastroll = 6000 # signal that no starts are recorded in system. |
236 |
|
237 |
rrdtool.update(DATADIR + "lastroll" + '.waittime.rrd', |
238 |
'N:' + repr(int(lastroll))) |
239 |
|
240 |
cx.close() |
241 |
|
242 |
### Local Variables: *** |
243 |
### mode: python *** |
244 |
### End: *** |
245 |
|