/[pdpsoft]/nl.nikhef.ndpf.groupviews/trunk/ndpf-gv-dbupdate
ViewVC logotype

Contents of /nl.nikhef.ndpf.groupviews/trunk/ndpf-gv-dbupdate

Parent Directory Parent Directory | Revision Log Revision Log


Revision 3131 - (show annotations) (download)
Tue Oct 25 10:18:12 2016 UTC (5 years, 2 months ago) by templon
File size: 7032 byte(s)
dbupdate: move time computation to better match delays in returning torque
data.
mkplots: cosmetic changes for color stability when offline/unused on plots

1 #!/usr/bin/python2
2 # $Id$
3 # Source: $URL$
4 # J. A. Templon, NIKHEF/PDP 2011
5
6 # purpose: update rrd databases for ndpf group views
7 # program runs qstat, parses the output to find number of waiting and running jobs per group
8 # this is added to an RRD database
9
10 DEBUG = 1
11 import os
12 import tempfile
13 qsfname = tempfile.mktemp(".txt","qsmf",os.environ['HOME']+"/tmp")
14
15 TORQUE="korf.nikhef.nl"
16 DATADIR=os.environ['HOME'] + '/ndpfdata/'
17 WAITDB = DATADIR + 'ndpfwaiting.rrd'
18 RUNDB = DATADIR + 'ndpfrunning.rrd'
19
20 import time
21 # old line for stro os.system('/usr/bin/qstat -f @' + TORQUE + ' > ' + qsfname)
22
23 now = time.mktime(time.localtime())
24 os.system('/opt/torque4/bin/qstat-torque -f @' + TORQUE + ' > ' + qsfname)
25
26 import torqueJobs
27 jlist = torqueJobs.qs_parsefile(qsfname)
28
29 os.system('mv ' + qsfname + ' ' + os.environ['HOME'] + '/tmp/qstat.last.txt')
30
31 import torqueAttMappers as tam
32 usedkeys = {'egroup' : 'varchar', 'jobid' : 'varchar', 'queue' : 'varchar', 'job_state' : 'varchar',
33 'euser' : 'varchar', 'qtime' : 'float', 'start_time' : 'float',
34 'comp_time' : 'float', 'exec_host' : 'integer' }
35 amap = {
36 'exec_host' : 'corect'
37 }
38
39 sqlitekeys = dict()
40 for k in usedkeys.keys():
41 if k in amap.keys():
42 sqlitekeys[amap[k]] = usedkeys[k]
43 else:
44 sqlitekeys[k] = usedkeys[k]
45
46 def mapatts(indict):
47 sdict = tam.sub_dict(indict,usedkeys.keys())
48 odict = dict()
49 changekeys = amap.keys()
50 for k in sdict.keys():
51 if k in tam.tfl2 and sdict[k]:
52 secs = tam.tconv(sdict[k])
53 sdict[k] = secs
54 elif k == 'job_state':
55 statelett = sdict[k]
56 if statelett in ['Q','W']:
57 sdict[k] = 'queued'
58 elif statelett in ['R','E']:
59 sdict[k] = 'running'
60 elif k == 'exec_host' and sdict[k]:
61 ehoststring = sdict[k]
62 sdict[k] = ehoststring.count('+') + 1
63 if sdict[k]:
64 if k in changekeys:
65 odict[amap[k]] = sdict[k]
66 else:
67 odict[k] = sdict[k]
68 return odict
69
70 newjlist = list()
71 for j in jlist:
72 newjlist.append(mapatts(j))
73
74 import sqlite
75 cx = sqlite.connect(":memory:")
76 cu = cx.cursor()
77
78 # build table creation string
79
80 crstr = 'create table jobs ('
81 ctr = 0
82 for k in sqlitekeys.keys():
83 if ctr > 0 : crstr = crstr + ','
84 crstr = crstr + "%s %s" % (k, sqlitekeys[k]) ; ctr += 1
85 crstr += ')'
86
87 # create the table
88
89 cu.execute(crstr)
90
91 # insert all the jobs found into the table
92
93 for jd in newjlist:
94
95 # build insert string
96 kl = jd.keys()
97 insstr = 'insert into jobs ('
98 ctr = 0
99 for k in kl:
100 if ctr > 0 : insstr += ','
101 insstr += k
102 ctr += 1
103 insstr += ') values ('
104 ctr = 0
105 for k in kl:
106 if ctr > 0 : insstr += ','
107 insstr += repr(jd[k])
108 ctr += 1
109 insstr += ')'
110
111 # print insstr
112 cu.execute(insstr)
113
114 # find the list of all queued jobs vs. group
115
116 queuedjobs = dict()
117
118 selstr = 'select egroup,count(*) from jobs where job_state="queued" group by egroup'
119 cu.execute(selstr)
120 rl = cu.fetchall()
121 for r in rl:
122 queuedjobs[r[0]] = r[1]
123
124 qtot = sum(queuedjobs.values())
125 queuedjobs['total'] = qtot
126
127 import rrdtool
128 # loop over all known databases and update; with val if known, zero otherwise.
129 import glob
130 queued_files = glob.glob(DATADIR+'*.queued.rrd')
131 for db in queued_files:
132 group = db[len(DATADIR):db.find('.queued.rrd')]
133 if group in queuedjobs.keys():
134 val = queuedjobs[group]
135 else:
136 val = 0
137 rrdtool.update(db, 'N:' + repr(val))
138
139 # do it again for the running jobs
140
141 runningjobs = dict()
142 selstr = 'select egroup,sum(corect) from jobs where job_state="running" group by egroup'
143 cu.execute(selstr)
144 rl = cu.fetchall()
145 # debug
146 # print selstr
147 for r in rl:
148 if r[1] == None:
149 print "Check for jobs from group %s with no exec host" % r[0]
150 else:
151 runningjobs[r[0]] = r[1]
152
153 rtot = sum(runningjobs.values())
154 totslots = int(open(DATADIR+'nslots').read())
155 if totslots < rtot:
156 print "Total slots from nslots file lower than total running jobs"
157 print totslots, "<", rtot
158 print "Setting total slots to total running jobs"
159 totslots = rtot
160 offlineslots = int(open(DATADIR+'oslots').read())
161 runningjobs['offline'] = offlineslots
162 runningjobs['unused'] = totslots - offlineslots - rtot
163 runningjobs['total'] = totslots
164
165 running_files = glob.glob(DATADIR+'*.running.rrd')
166 for db in running_files:
167 group = db[len(DATADIR):db.find('.running.rrd')]
168 if group in runningjobs.keys():
169 val = runningjobs[group]
170 else:
171 val = 0
172 rrdtool.update(db, 'N:' + repr(val))
173
174 # now do longest wait times
175
176 for group in queuedjobs.keys():
177 selstr = 'select qtime from jobs where job_state="queued" ' + \
178 'and egroup="' + group + '"'
179 cu.execute(selstr)
180 rl = cu.fetchall()
181 if len(rl) > 0:
182 qtl = list()
183 for r in rl:
184 qtl.append(r[0])
185 qtl.sort()
186 waitt = now - qtl[0]
187 else:
188 waitt = 2
189
190 rrdtool.update(DATADIR + group + '.waittime.rrd',
191 'N:' + repr(int(waitt)))
192
193 # rollover : avg core turnover in last 60 seconds; jobs completing!
194
195 #selstr_dbg = 'SELECT jobid, comp_time, typeof(comp_time) from jobs' + \
196 # ' where comp_time <> "None"'
197 #cu.execute(selstr_dbg)
198 #rl = cu.fetchall()
199 #print rl
200
201 selstr = 'select comp_time from jobs where comp_time<>"None" ' + \
202 'and comp_time > ' + repr(now-60)
203
204 cu.execute(selstr)
205 rl = cu.fetchall()
206
207 # print selstr
208 # print rl
209
210 if len(rl) > 0:
211 stl = list()
212 for r in rl:
213 stl.append(r[0])
214 rollover = 60.0/len(stl)
215 else:
216 selstr = 'select comp_time from jobs where comp_time<>"None"'
217 cu.execute(selstr)
218 rl = cu.fetchall()
219 if len(rl) > 1:
220 stl = list()
221 for r in rl:
222 stl.append(r[0])
223 stl.sort()
224 rollover = (now - stl[-2])/2.
225 elif len(rl) == 1:
226 rollover = now - rl[0][0]
227 if DEBUG: print 'used time since last start'
228 else:
229 rollover = 600 # if nothing seen, all we know is, not in last ten minutes (keep_completed)
230 if DEBUG:
231 print 'set to 600, no comp_times seen'
232 for tj in newjlist:
233 print tj
234
235 # print "len(stl), rollover:", len(stl), rollover
236 rrdtool.update(DATADIR + "rollover" + '.waittime.rrd',
237 'N:' + repr(rollover))
238
239 # lastroll : last recorded start in system
240
241 selstr = 'select start_time from jobs where start_time<>"None"'
242 cu.execute(selstr)
243 rl = cu.fetchall()
244 if len(rl) > 0:
245 stl = list()
246 for r in rl:
247 stl.append(r[0])
248
249 stl.sort()
250 lastroll = now - stl[-1]
251 # DEBUG print "now, last, lastroll", now, stl[-1], lastroll
252 if lastroll < 1:
253 lastroll = 1
254 else:
255 lastroll = 6000 # signal that no starts are recorded in system.
256
257 rrdtool.update(DATADIR + "lastroll" + '.waittime.rrd',
258 'N:' + repr(int(lastroll)))
259
260 cx.close()
261
262 ### Local Variables: ***
263 ### mode: python ***
264 ### End: ***
265

Properties

Name Value
svn:executable *
svn:keywords Id URL

grid.support@nikhef.nl
ViewVC Help
Powered by ViewVC 1.1.28