/[pdpsoft]/nl.nikhef.ndpf.groupviews/trunk/ndpf-gv-dbupdate
ViewVC logotype

Contents of /nl.nikhef.ndpf.groupviews/trunk/ndpf-gv-dbupdate

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2623 - (show annotations) (download)
Wed May 22 14:50:02 2013 UTC (8 years, 8 months ago) by templon
File size: 6218 byte(s)
make changes to have sections on the plot (and associated colors)
for "unused" capacity, "offline" capacity, and "other" jobs not
represented in the top 8 groups.

1 #!/usr/bin/python2
2 # $Id$
3 # Source: $URL$
4 # J. A. Templon, NIKHEF/PDP 2011
5
6 # purpose: update rrd databases for ndpf group views
7 # program runs qstat, parses the output to find number of waiting and running jobs per group
8 # this is added to an RRD database
9
10 import os
11 import tempfile
12 qsfname = tempfile.mktemp(".txt","qsmf",os.environ['HOME']+"/tmp")
13
14 TORQUE="stro.nikhef.nl"
15 DATADIR=os.environ['HOME'] + '/ndpfdata/'
16 WAITDB = DATADIR + 'ndpfwaiting.rrd'
17 RUNDB = DATADIR + 'ndpfrunning.rrd'
18
19 import time
20 os.system('/usr/bin/qstat -f @' + TORQUE + ' > ' + qsfname)
21 now = time.mktime(time.localtime())
22
23 import torqueJobs
24 jlist = torqueJobs.qs_parsefile(qsfname)
25
26 os.system('mv ' + qsfname + ' ' + os.environ['HOME'] + '/tmp/qstat.last.txt')
27
28 import torqueAttMappers as tam
29 usedkeys = {'egroup' : 'varchar', 'jobid' : 'varchar', 'queue' : 'varchar', 'job_state' : 'varchar',
30 'euser' : 'varchar', 'qtime' : 'float', 'start_time' : 'float',
31 'comp_time' : 'float', 'Resource_List.nodect' : 'integer' }
32
33 amap = {
34 'Resource_List.nodect' : 'nodect'
35 }
36 sqlitekeys = dict()
37 for k in usedkeys.keys():
38 if k in amap.keys():
39 sqlitekeys[amap[k]] = usedkeys[k]
40 else:
41 sqlitekeys[k] = usedkeys[k]
42
43 def mapatts(indict):
44 sdict = tam.sub_dict(indict,usedkeys.keys())
45 odict = dict()
46 changekeys = amap.keys()
47 for k in sdict.keys():
48 if k in tam.tfl2 and sdict[k]:
49 secs = tam.tconv(sdict[k])
50 sdict[k] = secs
51 elif k == 'job_state':
52 statelett = sdict[k]
53 if statelett in ['Q','W']:
54 sdict[k] = 'queued'
55 elif statelett in ['R','E']:
56 sdict[k] = 'running'
57 if sdict[k]:
58 if k in changekeys:
59 odict[amap[k]] = sdict[k]
60 else:
61 odict[k] = sdict[k]
62 if 'nodect' not in odict.keys():
63 odict['nodect'] = 1
64 return odict
65
66 newjlist = list()
67 for j in jlist:
68 newjlist.append(mapatts(j))
69
70 import sqlite
71 cx = sqlite.connect(":memory:")
72 cu = cx.cursor()
73
74 # build table creation string
75
76 crstr = 'create table jobs ('
77 ctr = 0
78 for k in sqlitekeys.keys():
79 if ctr > 0 : crstr = crstr + ','
80 crstr = crstr + "%s %s" % (k, sqlitekeys[k]) ; ctr += 1
81 crstr += ')'
82
83 # create the table
84
85 cu.execute(crstr)
86
87 # insert all the jobs found into the table
88
89 for jd in newjlist:
90
91 # build insert string
92 kl = jd.keys()
93 insstr = 'insert into jobs ('
94 ctr = 0
95 for k in kl:
96 if ctr > 0 : insstr += ','
97 insstr += k
98 ctr += 1
99 insstr += ') values ('
100 ctr = 0
101 for k in kl:
102 if ctr > 0 : insstr += ','
103 insstr += repr(jd[k])
104 ctr += 1
105 insstr += ')'
106
107 cu.execute(insstr)
108
109 # find the list of all queued jobs vs. group
110
111 queuedjobs = dict()
112
113 selstr = 'select egroup,count(*) from jobs where job_state="queued" group by egroup'
114 cu.execute(selstr)
115 rl = cu.fetchall()
116 for r in rl:
117 queuedjobs[r[0]] = r[1]
118
119 qtot = sum(queuedjobs.values())
120 queuedjobs['total'] = qtot
121
122 import rrdtool
123 # loop over all known databases and update; with val if known, zero otherwise.
124 import glob
125 queued_files = glob.glob(DATADIR+'*.queued.rrd')
126 for db in queued_files:
127 group = db[len(DATADIR):db.find('.queued.rrd')]
128 if group in queuedjobs.keys():
129 val = queuedjobs[group]
130 else:
131 val = 0
132 rrdtool.update(db, 'N:' + repr(val))
133
134 # do it again for the running jobs
135
136 runningjobs = dict()
137 selstr = 'select egroup,sum(nodect) from jobs where job_state="running" group by egroup'
138 cu.execute(selstr)
139 rl = cu.fetchall()
140 for r in rl:
141 runningjobs[r[0]] = r[1]
142
143 rtot = sum(runningjobs.values())
144 totslots = int(open(DATADIR+'nslots').read())
145 offlineslots = int(open(DATADIR+'oslots').read())
146 runningjobs['offline'] = offlineslots
147 runningjobs['unused'] = totslots - offlineslots - rtot
148 runningjobs['total'] = totslots
149
150 running_files = glob.glob(DATADIR+'*.running.rrd')
151 for db in running_files:
152 group = db[len(DATADIR):db.find('.running.rrd')]
153 if group in runningjobs.keys():
154 val = runningjobs[group]
155 else:
156 val = 0
157 rrdtool.update(db, 'N:' + repr(val))
158
159 # now do longest wait times
160
161 for group in queuedjobs.keys():
162 selstr = 'select qtime from jobs where job_state="queued" ' + \
163 'and egroup="' + group + '"'
164 cu.execute(selstr)
165 rl = cu.fetchall()
166 if len(rl) > 0:
167 qtl = list()
168 for r in rl:
169 qtl.append(r[0])
170 qtl.sort()
171 waitt = now - qtl[0]
172 else:
173 waitt = 2
174
175 rrdtool.update(DATADIR + group + '.waittime.rrd',
176 'N:' + repr(int(waitt)))
177
178 # rollover : avg core turnover in last 60 seconds; jobs completing!
179
180 selstr = 'select comp_time from jobs where comp_time<>"None" and comp_time > (' +\
181 repr(now) + ' - 60)'
182 cu.execute(selstr)
183 rl = cu.fetchall()
184 if len(rl) > 0:
185 stl = list()
186 for r in rl:
187 stl.append(r[0])
188 rollover = 60.0/len(stl)
189 else:
190 selstr = 'select comp_time from jobs where comp_time<>"None"'
191 cu.execute(selstr)
192 rl = cu.fetchall()
193 if len(rl) > 1:
194 stl = list()
195 for r in rl:
196 stl.append(r[0])
197 stl.sort()
198 rollover = (now - stl[-2])/2.
199 elif len(rl) == 1:
200 rollover = now - rl[0][0]
201 if DEBUG: print 'used time since last start'
202 else:
203 rollover = 600 # if nothing seen, all we know is, not in last ten minutes (keep_completed)
204 if DEBUG: print 'set to 600, no comp_times seen'
205
206 # print "len(stl), rollover:", len(stl), rollover
207 rrdtool.update(DATADIR + "rollover" + '.waittime.rrd',
208 'N:' + repr(int(rollover)))
209
210 # lastroll : last recorded start in system
211
212 selstr = 'select start_time from jobs where start_time<>"None"'
213 cu.execute(selstr)
214 rl = cu.fetchall()
215 if len(rl) > 0:
216 stl = list()
217 for r in rl:
218 stl.append(r[0])
219
220 stl.sort()
221 lastroll = now - stl[-1]
222 if lastroll < 1:
223 lastroll = 1
224 else:
225 lastroll = 6000 # signal that no starts are recorded in system.
226
227 rrdtool.update(DATADIR + "lastroll" + '.waittime.rrd',
228 'N:' + repr(int(lastroll)))
229
230 cx.close()
231
232 ### Local Variables: ***
233 ### mode: python ***
234 ### End: ***
235

Properties

Name Value
svn:executable *
svn:keywords Id URL

grid.support@nikhef.nl
ViewVC Help
Powered by ViewVC 1.1.28