/[pdpsoft]/nl.nikhef.ndpf.groupviews/trunk/ndpf-gv-dbupdate
ViewVC logotype

Contents of /nl.nikhef.ndpf.groupviews/trunk/ndpf-gv-dbupdate

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2632 - (show annotations) (download)
Tue Jun 11 09:00:23 2013 UTC (8 years, 7 months ago) by templon
File size: 6413 byte(s)
add code to deal with case where probe for nslots is broken.
sets total slots to be same as num of running jobs

1 #!/usr/bin/python2
2 # $Id$
3 # Source: $URL$
4 # J. A. Templon, NIKHEF/PDP 2011
5
6 # purpose: update rrd databases for ndpf group views
7 # program runs qstat, parses the output to find number of waiting and running jobs per group
8 # this is added to an RRD database
9
10 import os
11 import tempfile
12 qsfname = tempfile.mktemp(".txt","qsmf",os.environ['HOME']+"/tmp")
13
14 TORQUE="stro.nikhef.nl"
15 DATADIR=os.environ['HOME'] + '/ndpfdata/'
16 WAITDB = DATADIR + 'ndpfwaiting.rrd'
17 RUNDB = DATADIR + 'ndpfrunning.rrd'
18
19 import time
20 os.system('/usr/bin/qstat -f @' + TORQUE + ' > ' + qsfname)
21 now = time.mktime(time.localtime())
22
23 import torqueJobs
24 jlist = torqueJobs.qs_parsefile(qsfname)
25
26 os.system('mv ' + qsfname + ' ' + os.environ['HOME'] + '/tmp/qstat.last.txt')
27
28 import torqueAttMappers as tam
29 usedkeys = {'egroup' : 'varchar', 'jobid' : 'varchar', 'queue' : 'varchar', 'job_state' : 'varchar',
30 'euser' : 'varchar', 'qtime' : 'float', 'start_time' : 'float',
31 'comp_time' : 'float', 'Resource_List.nodect' : 'integer' }
32
33 amap = {
34 'Resource_List.nodect' : 'nodect'
35 }
36 sqlitekeys = dict()
37 for k in usedkeys.keys():
38 if k in amap.keys():
39 sqlitekeys[amap[k]] = usedkeys[k]
40 else:
41 sqlitekeys[k] = usedkeys[k]
42
43 def mapatts(indict):
44 sdict = tam.sub_dict(indict,usedkeys.keys())
45 odict = dict()
46 changekeys = amap.keys()
47 for k in sdict.keys():
48 if k in tam.tfl2 and sdict[k]:
49 secs = tam.tconv(sdict[k])
50 sdict[k] = secs
51 elif k == 'job_state':
52 statelett = sdict[k]
53 if statelett in ['Q','W']:
54 sdict[k] = 'queued'
55 elif statelett in ['R','E']:
56 sdict[k] = 'running'
57 if sdict[k]:
58 if k in changekeys:
59 odict[amap[k]] = sdict[k]
60 else:
61 odict[k] = sdict[k]
62 if 'nodect' not in odict.keys():
63 odict['nodect'] = 1
64 return odict
65
66 newjlist = list()
67 for j in jlist:
68 newjlist.append(mapatts(j))
69
70 import sqlite
71 cx = sqlite.connect(":memory:")
72 cu = cx.cursor()
73
74 # build table creation string
75
76 crstr = 'create table jobs ('
77 ctr = 0
78 for k in sqlitekeys.keys():
79 if ctr > 0 : crstr = crstr + ','
80 crstr = crstr + "%s %s" % (k, sqlitekeys[k]) ; ctr += 1
81 crstr += ')'
82
83 # create the table
84
85 cu.execute(crstr)
86
87 # insert all the jobs found into the table
88
89 for jd in newjlist:
90
91 # build insert string
92 kl = jd.keys()
93 insstr = 'insert into jobs ('
94 ctr = 0
95 for k in kl:
96 if ctr > 0 : insstr += ','
97 insstr += k
98 ctr += 1
99 insstr += ') values ('
100 ctr = 0
101 for k in kl:
102 if ctr > 0 : insstr += ','
103 insstr += repr(jd[k])
104 ctr += 1
105 insstr += ')'
106
107 cu.execute(insstr)
108
109 # find the list of all queued jobs vs. group
110
111 queuedjobs = dict()
112
113 selstr = 'select egroup,count(*) from jobs where job_state="queued" group by egroup'
114 cu.execute(selstr)
115 rl = cu.fetchall()
116 for r in rl:
117 queuedjobs[r[0]] = r[1]
118
119 qtot = sum(queuedjobs.values())
120 queuedjobs['total'] = qtot
121
122 import rrdtool
123 # loop over all known databases and update; with val if known, zero otherwise.
124 import glob
125 queued_files = glob.glob(DATADIR+'*.queued.rrd')
126 for db in queued_files:
127 group = db[len(DATADIR):db.find('.queued.rrd')]
128 if group in queuedjobs.keys():
129 val = queuedjobs[group]
130 else:
131 val = 0
132 rrdtool.update(db, 'N:' + repr(val))
133
134 # do it again for the running jobs
135
136 runningjobs = dict()
137 selstr = 'select egroup,sum(nodect) from jobs where job_state="running" group by egroup'
138 cu.execute(selstr)
139 rl = cu.fetchall()
140 for r in rl:
141 runningjobs[r[0]] = r[1]
142
143 rtot = sum(runningjobs.values())
144 totslots = int(open(DATADIR+'nslots').read())
145 if totslots < rtot:
146 print "Total slots from nslots file lower than total running jobs"
147 print totslots, "<", rtot
148 print "Setting total slots to total running jobs"
149 totslots = rtot
150 offlineslots = int(open(DATADIR+'oslots').read())
151 runningjobs['offline'] = offlineslots
152 runningjobs['unused'] = totslots - offlineslots - rtot
153 runningjobs['total'] = totslots
154
155 running_files = glob.glob(DATADIR+'*.running.rrd')
156 for db in running_files:
157 group = db[len(DATADIR):db.find('.running.rrd')]
158 if group in runningjobs.keys():
159 val = runningjobs[group]
160 else:
161 val = 0
162 rrdtool.update(db, 'N:' + repr(val))
163
164 # now do longest wait times
165
166 for group in queuedjobs.keys():
167 selstr = 'select qtime from jobs where job_state="queued" ' + \
168 'and egroup="' + group + '"'
169 cu.execute(selstr)
170 rl = cu.fetchall()
171 if len(rl) > 0:
172 qtl = list()
173 for r in rl:
174 qtl.append(r[0])
175 qtl.sort()
176 waitt = now - qtl[0]
177 else:
178 waitt = 2
179
180 rrdtool.update(DATADIR + group + '.waittime.rrd',
181 'N:' + repr(int(waitt)))
182
183 # rollover : avg core turnover in last 60 seconds; jobs completing!
184
185 selstr = 'select comp_time from jobs where comp_time<>"None" and comp_time > (' +\
186 repr(now) + ' - 60)'
187 cu.execute(selstr)
188 rl = cu.fetchall()
189 if len(rl) > 0:
190 stl = list()
191 for r in rl:
192 stl.append(r[0])
193 rollover = 60.0/len(stl)
194 else:
195 selstr = 'select comp_time from jobs where comp_time<>"None"'
196 cu.execute(selstr)
197 rl = cu.fetchall()
198 if len(rl) > 1:
199 stl = list()
200 for r in rl:
201 stl.append(r[0])
202 stl.sort()
203 rollover = (now - stl[-2])/2.
204 elif len(rl) == 1:
205 rollover = now - rl[0][0]
206 if DEBUG: print 'used time since last start'
207 else:
208 rollover = 600 # if nothing seen, all we know is, not in last ten minutes (keep_completed)
209 if DEBUG: print 'set to 600, no comp_times seen'
210
211 # print "len(stl), rollover:", len(stl), rollover
212 rrdtool.update(DATADIR + "rollover" + '.waittime.rrd',
213 'N:' + repr(int(rollover)))
214
215 # lastroll : last recorded start in system
216
217 selstr = 'select start_time from jobs where start_time<>"None"'
218 cu.execute(selstr)
219 rl = cu.fetchall()
220 if len(rl) > 0:
221 stl = list()
222 for r in rl:
223 stl.append(r[0])
224
225 stl.sort()
226 lastroll = now - stl[-1]
227 if lastroll < 1:
228 lastroll = 1
229 else:
230 lastroll = 6000 # signal that no starts are recorded in system.
231
232 rrdtool.update(DATADIR + "lastroll" + '.waittime.rrd',
233 'N:' + repr(int(lastroll)))
234
235 cx.close()
236
237 ### Local Variables: ***
238 ### mode: python ***
239 ### End: ***
240

Properties

Name Value
svn:executable *
svn:keywords Id URL

grid.support@nikhef.nl
ViewVC Help
Powered by ViewVC 1.1.28