1 |
#!/usr/bin/python2 |
2 |
# $Id$ |
3 |
# Source: $URL$ |
4 |
# J. A. Templon, NIKHEF/PDP 2011 |
5 |
|
6 |
# purpose: update rrd databases for ndpf group views |
7 |
# program runs qstat, parses the output to find number of waiting and running jobs per group |
8 |
# this is added to an RRD database |
9 |
|
10 |
import os |
11 |
import tempfile |
12 |
qsfname = tempfile.mktemp(".txt","qsmf",os.environ['HOME']+"/tmp") |
13 |
|
14 |
TORQUE="stro.nikhef.nl" |
15 |
DATADIR=os.environ['HOME'] + '/ndpfdata/' |
16 |
WAITDB = DATADIR + 'ndpfwaiting.rrd' |
17 |
RUNDB = DATADIR + 'ndpfrunning.rrd' |
18 |
|
19 |
os.system('/usr/bin/qstat -f @' + TORQUE + ' > ' + qsfname) |
20 |
|
21 |
import torqueJobs |
22 |
jlist = torqueJobs.qs_parsefile(qsfname) |
23 |
|
24 |
os.system('mv ' + qsfname + ' ' + os.environ['HOME'] + '/tmp/qstat.last.txt') |
25 |
|
26 |
import torqueAttMappers as tam |
27 |
usedkeys = {'egroup' : 'varchar', 'jobid' : 'varchar', 'queue' : 'varchar', 'job_state' : 'varchar', |
28 |
'euser' : 'varchar', 'qtime' : 'float', 'start_time' : 'float' } |
29 |
def mapatts(indict): |
30 |
sdict = tam.sub_dict(indict,usedkeys.keys()) |
31 |
odict = dict() |
32 |
for k in sdict.keys(): |
33 |
if k in tam.tfl2 and sdict[k]: |
34 |
secs = tam.tconv(sdict[k]) |
35 |
sdict[k] = secs |
36 |
elif k == 'job_state': |
37 |
statelett = sdict[k] |
38 |
if statelett in ['Q','W']: |
39 |
sdict[k] = 'queued' |
40 |
elif statelett in ['R','E']: |
41 |
sdict[k] = 'running' |
42 |
if sdict[k]: |
43 |
odict[k] = sdict[k] |
44 |
return odict |
45 |
|
46 |
newjlist = list() |
47 |
for j in jlist: |
48 |
newjlist.append(mapatts(j)) |
49 |
|
50 |
import sqlite |
51 |
cx = sqlite.connect(":memory:") |
52 |
cu = cx.cursor() |
53 |
|
54 |
# build table creation string |
55 |
|
56 |
crstr = 'create table jobs (' |
57 |
ctr = 0 |
58 |
for k in usedkeys.keys(): |
59 |
if ctr > 0 : crstr = crstr + ',' |
60 |
crstr = crstr + "%s %s" % (k, usedkeys[k]) ; ctr += 1 |
61 |
crstr += ')' |
62 |
|
63 |
# create the table |
64 |
|
65 |
cu.execute(crstr) |
66 |
|
67 |
# insert all the jobs found into the table |
68 |
|
69 |
for jd in newjlist: |
70 |
|
71 |
# build insert string |
72 |
kl = jd.keys() |
73 |
insstr = 'insert into jobs (' |
74 |
ctr = 0 |
75 |
for k in kl: |
76 |
if ctr > 0 : insstr += ',' |
77 |
insstr += k |
78 |
ctr += 1 |
79 |
insstr += ') values (' |
80 |
ctr = 0 |
81 |
for k in kl: |
82 |
if ctr > 0 : insstr += ',' |
83 |
insstr += repr(jd[k]) |
84 |
ctr += 1 |
85 |
insstr += ')' |
86 |
|
87 |
cu.execute(insstr) |
88 |
# print insstr |
89 |
|
90 |
# find the list of all queued jobs vs. group |
91 |
|
92 |
queuedjobs = dict() |
93 |
|
94 |
selstr = 'select egroup,count(*) from jobs where job_state="queued" group by egroup' |
95 |
cu.execute(selstr) |
96 |
rl = cu.fetchall() |
97 |
for r in rl: |
98 |
queuedjobs[r[0]] = r[1] |
99 |
|
100 |
qtot = sum(queuedjobs.values()) |
101 |
queuedjobs['total'] = qtot |
102 |
|
103 |
import rrdtool |
104 |
# loop over all known databases and update; with val if known, zero otherwise. |
105 |
import glob |
106 |
queued_files = glob.glob(DATADIR+'*.queued.rrd') |
107 |
for db in queued_files: |
108 |
group = db[len(DATADIR):db.find('.queued.rrd')] |
109 |
if group in queuedjobs.keys(): |
110 |
val = queuedjobs[group] |
111 |
else: |
112 |
val = 0 |
113 |
rrdtool.update(db, 'N:' + repr(val)) |
114 |
|
115 |
# do it again for the running jobs |
116 |
|
117 |
runningjobs = dict() |
118 |
selstr = 'select egroup,count(*) from jobs where job_state="running" group by egroup' |
119 |
cu.execute(selstr) |
120 |
rl = cu.fetchall() |
121 |
for r in rl: |
122 |
runningjobs[r[0]] = r[1] |
123 |
|
124 |
rtot = sum(runningjobs.values()) |
125 |
runningjobs['total'] = rtot |
126 |
|
127 |
running_files = glob.glob(DATADIR+'*.running.rrd') |
128 |
for db in running_files: |
129 |
group = db[len(DATADIR):db.find('.running.rrd')] |
130 |
if group in runningjobs.keys(): |
131 |
val = runningjobs[group] |
132 |
else: |
133 |
val = 0 |
134 |
rrdtool.update(db, 'N:' + repr(val)) |
135 |
|
136 |
import time |
137 |
now = time.mktime(time.localtime()) |
138 |
|
139 |
# now do longest wait times |
140 |
|
141 |
for group in queuedjobs.keys(): |
142 |
selstr = 'select qtime from jobs where job_state="queued" ' + \ |
143 |
'and egroup="' + group + '"' |
144 |
cu.execute(selstr) |
145 |
rl = cu.fetchall() |
146 |
if len(rl) > 0: |
147 |
qtl = list() |
148 |
for r in rl: |
149 |
qtl.append(r[0]) |
150 |
qtl.sort() |
151 |
waitt = now - qtl[0] |
152 |
else: |
153 |
waitt = 2 |
154 |
|
155 |
rrdtool.update(DATADIR + group + '.waittime.rrd', |
156 |
'N:' + repr(int(waitt))) |
157 |
|
158 |
selstr = 'select start_time from jobs where job_state="running" and start_time<>"None"' |
159 |
cu.execute(selstr) |
160 |
rl = cu.fetchall() |
161 |
if len(rl) > 0: |
162 |
stl = list() |
163 |
for r in rl: |
164 |
stl.append(r[0]) |
165 |
|
166 |
stl.sort() |
167 |
lastroll = now - stl[-1] |
168 |
if lastroll < 2: |
169 |
lastroll = 2 |
170 |
|
171 |
rrdtool.update(DATADIR + "rollover" + '.waittime.rrd', |
172 |
'N:' + repr(int(lastroll))) |
173 |
|
174 |
### Local Variables: *** |
175 |
### mode: python *** |
176 |
### End: *** |
177 |
|