/[pdpsoft]/nl.nikhef.ndpf.groupviews/trunk/ndpf-gv-dbupdate
ViewVC logotype

Contents of /nl.nikhef.ndpf.groupviews/trunk/ndpf-gv-dbupdate

Parent Directory Parent Directory | Revision Log Revision Log


Revision 3148 - (show annotations) (download)
Thu Jan 12 10:32:55 2017 UTC (5 years, 4 months ago) by templon
File size: 7088 byte(s)
made two updates associated with move from naab to alento:
1) use sqlite3 module instead of sqlite (just changed import statement)
2) recast a database-name string from UTF-8 to ASCII as rrdtool
   doesn't understand UTF-8 strings :-)
   

1 #!/usr/bin/python2
2 # $Id$
3 # Source: $URL$
4 # J. A. Templon, NIKHEF/PDP 2011
5
6 # purpose: update rrd databases for ndpf group views
7 # program runs qstat, parses the output to find number of waiting and running jobs per group
8 # this is added to an RRD database
9
10 DEBUG = 1
11 import os
12 import tempfile
13 qsfname = tempfile.mktemp(".txt","qsmf",os.environ['HOME']+"/tmp")
14
15 TORQUE="korf.nikhef.nl"
16 DATADIR=os.environ['HOME'] + '/ndpfdata/'
17 WAITDB = DATADIR + 'ndpfwaiting.rrd'
18 RUNDB = DATADIR + 'ndpfrunning.rrd'
19
20 import time
21 # old line for stro os.system('/usr/bin/qstat -f @' + TORQUE + ' > ' + qsfname)
22
23 now = time.mktime(time.localtime())
24 os.system('/opt/torque4/bin/qstat-torque -f @' + TORQUE + ' > ' + qsfname)
25
26 import torqueJobs
27 jlist = torqueJobs.qs_parsefile(qsfname)
28
29 os.system('mv ' + qsfname + ' ' + os.environ['HOME'] + '/tmp/qstat.last.txt')
30
31 import torqueAttMappers as tam
32 usedkeys = {'egroup' : 'varchar', 'jobid' : 'varchar', 'queue' : 'varchar', 'job_state' : 'varchar',
33 'euser' : 'varchar', 'qtime' : 'float', 'start_time' : 'float',
34 'comp_time' : 'float', 'exec_host' : 'integer' }
35 amap = {
36 'exec_host' : 'corect'
37 }
38
39 sqlitekeys = dict()
40 for k in usedkeys.keys():
41 if k in amap.keys():
42 sqlitekeys[amap[k]] = usedkeys[k]
43 else:
44 sqlitekeys[k] = usedkeys[k]
45
46 def mapatts(indict):
47 sdict = tam.sub_dict(indict,usedkeys.keys())
48 odict = dict()
49 changekeys = amap.keys()
50 for k in sdict.keys():
51 if k in tam.tfl2 and sdict[k]:
52 secs = tam.tconv(sdict[k])
53 sdict[k] = secs
54 elif k == 'job_state':
55 statelett = sdict[k]
56 if statelett in ['Q','W']:
57 sdict[k] = 'queued'
58 elif statelett in ['R','E']:
59 sdict[k] = 'running'
60 elif k == 'exec_host' and sdict[k]:
61 ehoststring = sdict[k]
62 sdict[k] = ehoststring.count('+') + 1
63 if sdict[k]:
64 if k in changekeys:
65 odict[amap[k]] = sdict[k]
66 else:
67 odict[k] = sdict[k]
68 return odict
69
70 newjlist = list()
71 for j in jlist:
72 newjlist.append(mapatts(j))
73
74 import sqlite3 as sqlite
75 cx = sqlite.connect(":memory:")
76 cu = cx.cursor()
77
78 # build table creation string
79
80 crstr = 'create table jobs ('
81 ctr = 0
82 for k in sqlitekeys.keys():
83 if ctr > 0 : crstr = crstr + ','
84 crstr = crstr + "%s %s" % (k, sqlitekeys[k]) ; ctr += 1
85 crstr += ')'
86
87 # create the table
88
89 cu.execute(crstr)
90
91 # insert all the jobs found into the table
92
93 for jd in newjlist:
94
95 # build insert string
96 kl = jd.keys()
97 insstr = 'insert into jobs ('
98 ctr = 0
99 for k in kl:
100 if ctr > 0 : insstr += ','
101 insstr += k
102 ctr += 1
103 insstr += ') values ('
104 ctr = 0
105 for k in kl:
106 if ctr > 0 : insstr += ','
107 insstr += repr(jd[k])
108 ctr += 1
109 insstr += ')'
110
111 # print insstr
112 cu.execute(insstr)
113
114 # find the list of all queued jobs vs. group
115
116 queuedjobs = dict()
117
118 selstr = 'select egroup,count(*) from jobs where job_state="queued" group by egroup'
119 cu.execute(selstr)
120 rl = cu.fetchall()
121 for r in rl:
122 queuedjobs[r[0]] = r[1]
123
124 qtot = sum(queuedjobs.values())
125 queuedjobs['total'] = qtot
126
127 import rrdtool
128 # loop over all known databases and update; with val if known, zero otherwise.
129 import glob
130 queued_files = glob.glob(DATADIR+'*.queued.rrd')
131 for db in queued_files:
132 group = db[len(DATADIR):db.find('.queued.rrd')]
133 if group in queuedjobs.keys():
134 val = queuedjobs[group]
135 else:
136 val = 0
137 rrdtool.update(db, 'N:' + repr(val))
138
139 # do it again for the running jobs
140
141 runningjobs = dict()
142 selstr = 'select egroup,sum(corect) from jobs where job_state="running" group by egroup'
143 cu.execute(selstr)
144 rl = cu.fetchall()
145 # debug
146 # print selstr
147 for r in rl:
148 if r[1] == None:
149 print "Check for jobs from group %s with no exec host" % r[0]
150 else:
151 runningjobs[r[0]] = r[1]
152
153 rtot = sum(runningjobs.values())
154 totslots = int(open(DATADIR+'nslots').read())
155 if totslots < rtot:
156 print "Total slots from nslots file lower than total running jobs"
157 print totslots, "<", rtot
158 print "Setting total slots to total running jobs"
159 totslots = rtot
160 offlineslots = int(open(DATADIR+'oslots').read())
161 runningjobs['offline'] = offlineslots
162 runningjobs['unused'] = totslots - offlineslots - rtot
163 runningjobs['total'] = totslots
164
165 running_files = glob.glob(DATADIR+'*.running.rrd')
166 for db in running_files:
167 group = db[len(DATADIR):db.find('.running.rrd')]
168 if group in runningjobs.keys():
169 val = runningjobs[group]
170 else:
171 val = 0
172 rrdtool.update(db, 'N:' + repr(val))
173
174 # now do longest wait times
175
176 for group in queuedjobs.keys():
177 selstr = 'select qtime from jobs where job_state="queued" ' + \
178 'and egroup="' + group + '"'
179 cu.execute(selstr)
180 rl = cu.fetchall()
181 if len(rl) > 0:
182 qtl = list()
183 for r in rl:
184 qtl.append(r[0])
185 qtl.sort()
186 waitt = now - qtl[0]
187 else:
188 waitt = 2
189
190 db = (DATADIR + group + '.waittime.rrd').encode('ascii') # UTF-8 gives rrdtool update err
191
192 rrdtool.update(db, 'N:' + repr(int(waitt)))
193
194 # rollover : avg core turnover in last 60 seconds; jobs completing!
195
196 #selstr_dbg = 'SELECT jobid, comp_time, typeof(comp_time) from jobs' + \
197 # ' where comp_time <> "None"'
198 #cu.execute(selstr_dbg)
199 #rl = cu.fetchall()
200 #print rl
201
202 selstr = 'select comp_time from jobs where comp_time<>"None" ' + \
203 'and comp_time > ' + repr(now-60)
204
205 cu.execute(selstr)
206 rl = cu.fetchall()
207
208 # print selstr
209 # print rl
210
211 if len(rl) > 0:
212 stl = list()
213 for r in rl:
214 stl.append(r[0])
215 rollover = 60.0/len(stl)
216 else:
217 selstr = 'select comp_time from jobs where comp_time<>"None"'
218 cu.execute(selstr)
219 rl = cu.fetchall()
220 if len(rl) > 1:
221 stl = list()
222 for r in rl:
223 stl.append(r[0])
224 stl.sort()
225 rollover = (now - stl[-2])/2.
226 elif len(rl) == 1:
227 rollover = now - rl[0][0]
228 if DEBUG: print 'used time since last start'
229 else:
230 rollover = 600 # if nothing seen, all we know is, not in last ten minutes (keep_completed)
231 if DEBUG:
232 print 'set to 600, no comp_times seen'
233 for tj in newjlist:
234 print tj
235
236 # print "len(stl), rollover:", len(stl), rollover
237 rrdtool.update(DATADIR + "rollover" + '.waittime.rrd',
238 'N:' + repr(rollover))
239
240 # lastroll : last recorded start in system
241
242 selstr = 'select start_time from jobs where start_time<>"None"'
243 cu.execute(selstr)
244 rl = cu.fetchall()
245 if len(rl) > 0:
246 stl = list()
247 for r in rl:
248 stl.append(r[0])
249
250 stl.sort()
251 lastroll = now - stl[-1]
252 # DEBUG print "now, last, lastroll", now, stl[-1], lastroll
253 if lastroll < 1:
254 lastroll = 1
255 else:
256 lastroll = 6000 # signal that no starts are recorded in system.
257
258 rrdtool.update(DATADIR + "lastroll" + '.waittime.rrd',
259 'N:' + repr(int(lastroll)))
260
261 cx.close()
262
263 ### Local Variables: ***
264 ### mode: python ***
265 ### End: ***
266

Properties

Name Value
svn:executable *
svn:keywords Id URL

grid.support@nikhef.nl
ViewVC Help
Powered by ViewVC 1.1.28