/[pdpsoft]/nl.nikhef.ndpf.groupviews/branches/RB-2.1.1/ndpf-gv-dbupdate
ViewVC logotype

Contents of /nl.nikhef.ndpf.groupviews/branches/RB-2.1.1/ndpf-gv-dbupdate

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2554 - (show annotations) (download)
Tue Jun 26 07:50:24 2012 UTC (10 years, 1 month ago) by templon
File size: 5568 byte(s)
commit updates from a long time ago.

1 #!/usr/bin/python2
2 # $Id$
3 # Source: $URL$
4 # J. A. Templon, NIKHEF/PDP 2011
5
6 # purpose: update rrd databases for ndpf group views
7 # program runs qstat, parses the output to find number of waiting and running jobs per group
8 # this is added to an RRD database
9
10 import os
11 import tempfile
12 qsfname = tempfile.mktemp(".txt","qsmf",os.environ['HOME']+"/tmp")
13
14 TORQUE="stro.nikhef.nl"
15 DATADIR=os.environ['HOME'] + '/ndpfdata/'
16 WAITDB = DATADIR + 'ndpfwaiting.rrd'
17 RUNDB = DATADIR + 'ndpfrunning.rrd'
18
19 import time
20 os.system('/usr/bin/qstat -f @' + TORQUE + ' > ' + qsfname)
21 now = time.mktime(time.localtime())
22
23 import torqueJobs
24 jlist = torqueJobs.qs_parsefile(qsfname)
25
26 os.system('mv ' + qsfname + ' ' + os.environ['HOME'] + '/tmp/qstat.last.txt')
27
28 import torqueAttMappers as tam
29 usedkeys = {'egroup' : 'varchar', 'jobid' : 'varchar', 'queue' : 'varchar', 'job_state' : 'varchar',
30 'euser' : 'varchar', 'qtime' : 'float', 'start_time' : 'float',
31 'comp_time' : 'float' }
32 def mapatts(indict):
33 sdict = tam.sub_dict(indict,usedkeys.keys())
34 odict = dict()
35 for k in sdict.keys():
36 if k in tam.tfl2 and sdict[k]:
37 secs = tam.tconv(sdict[k])
38 sdict[k] = secs
39 elif k == 'job_state':
40 statelett = sdict[k]
41 if statelett in ['Q','W']:
42 sdict[k] = 'queued'
43 elif statelett in ['R','E']:
44 sdict[k] = 'running'
45 if sdict[k]:
46 odict[k] = sdict[k]
47 return odict
48
49 newjlist = list()
50 for j in jlist:
51 newjlist.append(mapatts(j))
52
53 import sqlite
54 cx = sqlite.connect(":memory:")
55 cu = cx.cursor()
56
57 # build table creation string
58
59 crstr = 'create table jobs ('
60 ctr = 0
61 for k in usedkeys.keys():
62 if ctr > 0 : crstr = crstr + ','
63 crstr = crstr + "%s %s" % (k, usedkeys[k]) ; ctr += 1
64 crstr += ')'
65
66 # create the table
67
68 cu.execute(crstr)
69
70 # insert all the jobs found into the table
71
72 for jd in newjlist:
73
74 # build insert string
75 kl = jd.keys()
76 insstr = 'insert into jobs ('
77 ctr = 0
78 for k in kl:
79 if ctr > 0 : insstr += ','
80 insstr += k
81 ctr += 1
82 insstr += ') values ('
83 ctr = 0
84 for k in kl:
85 if ctr > 0 : insstr += ','
86 insstr += repr(jd[k])
87 ctr += 1
88 insstr += ')'
89
90 cu.execute(insstr)
91 # print insstr
92
93 # find the list of all queued jobs vs. group
94
95 queuedjobs = dict()
96
97 selstr = 'select egroup,count(*) from jobs where job_state="queued" group by egroup'
98 cu.execute(selstr)
99 rl = cu.fetchall()
100 for r in rl:
101 queuedjobs[r[0]] = r[1]
102
103 qtot = sum(queuedjobs.values())
104 queuedjobs['total'] = qtot
105
106 import rrdtool
107 # loop over all known databases and update; with val if known, zero otherwise.
108 import glob
109 queued_files = glob.glob(DATADIR+'*.queued.rrd')
110 for db in queued_files:
111 group = db[len(DATADIR):db.find('.queued.rrd')]
112 if group in queuedjobs.keys():
113 val = queuedjobs[group]
114 else:
115 val = 0
116 rrdtool.update(db, 'N:' + repr(val))
117
118 # do it again for the running jobs
119
120 runningjobs = dict()
121 selstr = 'select egroup,count(*) from jobs where job_state="running" group by egroup'
122 cu.execute(selstr)
123 rl = cu.fetchall()
124 for r in rl:
125 runningjobs[r[0]] = r[1]
126
127 rtot = sum(runningjobs.values())
128 runningjobs['total'] = rtot
129
130 running_files = glob.glob(DATADIR+'*.running.rrd')
131 for db in running_files:
132 group = db[len(DATADIR):db.find('.running.rrd')]
133 if group in runningjobs.keys():
134 val = runningjobs[group]
135 else:
136 val = 0
137 rrdtool.update(db, 'N:' + repr(val))
138
139 # now do longest wait times
140
141 for group in queuedjobs.keys():
142 selstr = 'select qtime from jobs where job_state="queued" ' + \
143 'and egroup="' + group + '"'
144 cu.execute(selstr)
145 rl = cu.fetchall()
146 if len(rl) > 0:
147 qtl = list()
148 for r in rl:
149 qtl.append(r[0])
150 qtl.sort()
151 waitt = now - qtl[0]
152 else:
153 waitt = 2
154
155 rrdtool.update(DATADIR + group + '.waittime.rrd',
156 'N:' + repr(int(waitt)))
157
158 # rollover : avg core turnover in last 60 seconds; jobs completing!
159
160 selstr = 'select comp_time from jobs where comp_time<>"None" and comp_time > (' +\
161 repr(now) + ' - 60)'
162 cu.execute(selstr)
163 rl = cu.fetchall()
164 if len(rl) > 0:
165 stl = list()
166 for r in rl:
167 stl.append(r[0])
168 rollover = 60.0/len(stl)
169 else:
170 selstr = 'select comp_time from jobs where comp_time<>"None"'
171 cu.execute(selstr)
172 rl = cu.fetchall()
173 if len(rl) > 1:
174 stl = list()
175 for r in rl:
176 stl.append(r[0])
177 stl.sort()
178 rollover = (now - stl[-2])/2.
179 elif len(rl) == 1:
180 rollover = now - rl[0][0]
181 print 'used time since last start'
182 else:
183 rollover = 600 # if nothing seen, all we know is, not in last ten minutes (keep_completed)
184 print 'set to 600, no comp_times seen'
185
186 # print "len(stl), rollover:", len(stl), rollover
187 rrdtool.update(DATADIR + "rollover" + '.waittime.rrd',
188 'N:' + repr(int(rollover)))
189
190 # lastroll : last recorded start in system
191
192 selstr = 'select start_time from jobs where start_time<>"None"'
193 cu.execute(selstr)
194 rl = cu.fetchall()
195 if len(rl) > 0:
196 stl = list()
197 for r in rl:
198 stl.append(r[0])
199
200 stl.sort()
201 lastroll = now - stl[-1]
202 if lastroll < 1:
203 lastroll = 1
204 else:
205 lastroll = 6000 # signal that no starts are recorded in system.
206
207 rrdtool.update(DATADIR + "lastroll" + '.waittime.rrd',
208 'N:' + repr(int(lastroll)))
209
210 ### Local Variables: ***
211 ### mode: python ***
212 ### End: ***
213

Properties

Name Value
svn:executable *
svn:keywords Id URL

grid.support@nikhef.nl
ViewVC Help
Powered by ViewVC 1.1.28