/[pdpsoft]/nl.nikhef.ndpf.groupviews/trunk/ndpf-gv-dbupdate
ViewVC logotype

Contents of /nl.nikhef.ndpf.groupviews/trunk/ndpf-gv-dbupdate

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2637 - (show annotations) (download)
Thu Jun 27 13:05:31 2013 UTC (9 years, 3 months ago) by templon
File size: 6534 byte(s)
add alicepil group
bit of debugging info dbupdate

1 #!/usr/bin/python2
2 # $Id$
3 # Source: $URL$
4 # J. A. Templon, NIKHEF/PDP 2011
5
6 # purpose: update rrd databases for ndpf group views
7 # program runs qstat, parses the output to find number of waiting and running jobs per group
8 # this is added to an RRD database
9
10 DEBUG = 1
11 import os
12 import tempfile
13 qsfname = tempfile.mktemp(".txt","qsmf",os.environ['HOME']+"/tmp")
14
15 TORQUE="stro.nikhef.nl"
16 DATADIR=os.environ['HOME'] + '/ndpfdata/'
17 WAITDB = DATADIR + 'ndpfwaiting.rrd'
18 RUNDB = DATADIR + 'ndpfrunning.rrd'
19
20 import time
21 os.system('/usr/bin/qstat -f @' + TORQUE + ' > ' + qsfname)
22 now = time.mktime(time.localtime())
23
24 import torqueJobs
25 jlist = torqueJobs.qs_parsefile(qsfname)
26
27 os.system('mv ' + qsfname + ' ' + os.environ['HOME'] + '/tmp/qstat.last.txt')
28
29 import torqueAttMappers as tam
30 usedkeys = {'egroup' : 'varchar', 'jobid' : 'varchar', 'queue' : 'varchar', 'job_state' : 'varchar',
31 'euser' : 'varchar', 'qtime' : 'float', 'start_time' : 'float',
32 'comp_time' : 'float', 'exec_host' : 'integer' }
33 amap = {
34 'exec_host' : 'corect'
35 }
36
37 sqlitekeys = dict()
38 for k in usedkeys.keys():
39 if k in amap.keys():
40 sqlitekeys[amap[k]] = usedkeys[k]
41 else:
42 sqlitekeys[k] = usedkeys[k]
43
44 def mapatts(indict):
45 sdict = tam.sub_dict(indict,usedkeys.keys())
46 odict = dict()
47 changekeys = amap.keys()
48 for k in sdict.keys():
49 if k in tam.tfl2 and sdict[k]:
50 secs = tam.tconv(sdict[k])
51 sdict[k] = secs
52 elif k == 'job_state':
53 statelett = sdict[k]
54 if statelett in ['Q','W']:
55 sdict[k] = 'queued'
56 elif statelett in ['R','E']:
57 sdict[k] = 'running'
58 elif k == 'exec_host' and sdict[k]:
59 ehoststring = sdict[k]
60 sdict[k] = ehoststring.count('+') + 1
61 if sdict[k]:
62 if k in changekeys:
63 odict[amap[k]] = sdict[k]
64 else:
65 odict[k] = sdict[k]
66 return odict
67
68 newjlist = list()
69 for j in jlist:
70 newjlist.append(mapatts(j))
71
72 import sqlite
73 cx = sqlite.connect(":memory:")
74 cu = cx.cursor()
75
76 # build table creation string
77
78 crstr = 'create table jobs ('
79 ctr = 0
80 for k in sqlitekeys.keys():
81 if ctr > 0 : crstr = crstr + ','
82 crstr = crstr + "%s %s" % (k, sqlitekeys[k]) ; ctr += 1
83 crstr += ')'
84
85 # create the table
86
87 cu.execute(crstr)
88
89 # insert all the jobs found into the table
90
91 for jd in newjlist:
92
93 # build insert string
94 kl = jd.keys()
95 insstr = 'insert into jobs ('
96 ctr = 0
97 for k in kl:
98 if ctr > 0 : insstr += ','
99 insstr += k
100 ctr += 1
101 insstr += ') values ('
102 ctr = 0
103 for k in kl:
104 if ctr > 0 : insstr += ','
105 insstr += repr(jd[k])
106 ctr += 1
107 insstr += ')'
108
109 cu.execute(insstr)
110
111 # find the list of all queued jobs vs. group
112
113 queuedjobs = dict()
114
115 selstr = 'select egroup,count(*) from jobs where job_state="queued" group by egroup'
116 cu.execute(selstr)
117 rl = cu.fetchall()
118 for r in rl:
119 queuedjobs[r[0]] = r[1]
120
121 qtot = sum(queuedjobs.values())
122 queuedjobs['total'] = qtot
123
124 import rrdtool
125 # loop over all known databases and update; with val if known, zero otherwise.
126 import glob
127 queued_files = glob.glob(DATADIR+'*.queued.rrd')
128 for db in queued_files:
129 group = db[len(DATADIR):db.find('.queued.rrd')]
130 if group in queuedjobs.keys():
131 val = queuedjobs[group]
132 else:
133 val = 0
134 rrdtool.update(db, 'N:' + repr(val))
135
136 # do it again for the running jobs
137
138 runningjobs = dict()
139 selstr = 'select egroup,sum(corect) from jobs where job_state="running" group by egroup'
140 cu.execute(selstr)
141 rl = cu.fetchall()
142 for r in rl:
143 runningjobs[r[0]] = r[1]
144
145 rtot = sum(runningjobs.values())
146 totslots = int(open(DATADIR+'nslots').read())
147 if totslots < rtot:
148 print "Total slots from nslots file lower than total running jobs"
149 print totslots, "<", rtot
150 print "Setting total slots to total running jobs"
151 totslots = rtot
152 offlineslots = int(open(DATADIR+'oslots').read())
153 runningjobs['offline'] = offlineslots
154 runningjobs['unused'] = totslots - offlineslots - rtot
155 runningjobs['total'] = totslots
156
157 running_files = glob.glob(DATADIR+'*.running.rrd')
158 for db in running_files:
159 group = db[len(DATADIR):db.find('.running.rrd')]
160 if group in runningjobs.keys():
161 val = runningjobs[group]
162 else:
163 val = 0
164 rrdtool.update(db, 'N:' + repr(val))
165
166 # now do longest wait times
167
168 for group in queuedjobs.keys():
169 selstr = 'select qtime from jobs where job_state="queued" ' + \
170 'and egroup="' + group + '"'
171 cu.execute(selstr)
172 rl = cu.fetchall()
173 if len(rl) > 0:
174 qtl = list()
175 for r in rl:
176 qtl.append(r[0])
177 qtl.sort()
178 waitt = now - qtl[0]
179 else:
180 waitt = 2
181
182 rrdtool.update(DATADIR + group + '.waittime.rrd',
183 'N:' + repr(int(waitt)))
184
185 # rollover : avg core turnover in last 60 seconds; jobs completing!
186
187 selstr = 'select comp_time from jobs where comp_time<>"None" and comp_time > (' +\
188 repr(now) + ' - 60)'
189 cu.execute(selstr)
190 rl = cu.fetchall()
191 if len(rl) > 0:
192 stl = list()
193 for r in rl:
194 stl.append(r[0])
195 rollover = 60.0/len(stl)
196 else:
197 selstr = 'select comp_time from jobs where comp_time<>"None"'
198 cu.execute(selstr)
199 rl = cu.fetchall()
200 if len(rl) > 1:
201 stl = list()
202 for r in rl:
203 stl.append(r[0])
204 stl.sort()
205 rollover = (now - stl[-2])/2.
206 elif len(rl) == 1:
207 rollover = now - rl[0][0]
208 if DEBUG: print 'used time since last start'
209 else:
210 rollover = 600 # if nothing seen, all we know is, not in last ten minutes (keep_completed)
211 if DEBUG:
212 print 'set to 600, no comp_times seen'
213 for tj in newjlist:
214 print tj
215
216 # print "len(stl), rollover:", len(stl), rollover
217 rrdtool.update(DATADIR + "rollover" + '.waittime.rrd',
218 'N:' + repr(int(rollover)))
219
220 # lastroll : last recorded start in system
221
222 selstr = 'select start_time from jobs where start_time<>"None"'
223 cu.execute(selstr)
224 rl = cu.fetchall()
225 if len(rl) > 0:
226 stl = list()
227 for r in rl:
228 stl.append(r[0])
229
230 stl.sort()
231 lastroll = now - stl[-1]
232 if lastroll < 1:
233 lastroll = 1
234 else:
235 lastroll = 6000 # signal that no starts are recorded in system.
236
237 rrdtool.update(DATADIR + "lastroll" + '.waittime.rrd',
238 'N:' + repr(int(lastroll)))
239
240 cx.close()
241
242 ### Local Variables: ***
243 ### mode: python ***
244 ### End: ***
245

Properties

Name Value
svn:executable *
svn:keywords Id URL

grid.support@nikhef.nl
ViewVC Help
Powered by ViewVC 1.1.28