/[pdpsoft]/nl.nikhef.ndpf.groupviews/trunk/ndpf-gv-dbupdate
ViewVC logotype

Contents of /nl.nikhef.ndpf.groupviews/trunk/ndpf-gv-dbupdate

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2353 - (show annotations) (download)
Fri Jul 22 12:32:58 2011 UTC (11 years ago) by templon
File size: 6055 byte(s)
now also does running jobs in DB.

1 #!/usr/bin/python2
2 # $Id: ndpf-gv-dbupdate,v 1.1 2011/07/21 13:26:07 templon Exp $
3 # Source: $URL: svn+ssh://svn@ndpfsvn.nikhef.nl/repos/pdpsoft/nl.nikhef.pdp.dynsched/trunk/lcg-info-dynamic-scheduler.cin $
4 # J. A. Templon, NIKHEF/PDP 2011
5
6 # purpose: update rrd databases for ndpf group views
7 # program runs qstat, parses the output to find number of waiting and running jobs per group
8 # this is added to an RRD database
9
10 import os
11 import tempfile
12 qsfname = tempfile.mktemp(".txt","qsmf",os.environ['HOME']+"/tmp")
13
14 TORQUE="stro.nikhef.nl"
15 DATADIR=os.environ['HOME'] + '/ndpfdata/'
16 WAITDB = DATADIR + 'ndpfwaiting.rrd'
17 RUNDB = DATADIR + 'ndpfrunning.rrd'
18
19 grouplist = ['alicesgm',
20 'atlb',
21 'atlas',
22 'atlaspil',
23 'auger',
24 'biome',
25 'enmr',
26 'esr',
27 'lhcbpil',
28 'lhcbprd',
29 'lsgrid',
30 'ncf',
31 'palice',
32 'patlas',
33 'pdzero',
34 'vlemed'
35 ]
36
37 os.system('/usr/bin/qstat -f @' + TORQUE + ' > ' + qsfname)
38
39 import torqueJobs
40 jlist = torqueJobs.qs_parsefile(qsfname)
41
42 os.system('mv ' + qsfname + ' ' + os.environ['HOME'] + '/tmp/qstat.last.txt')
43
44 import torqueAttMappers as tam
45 usedkeys = {'egroup' : 'varchar', 'jobid' : 'varchar', 'queue' : 'varchar', 'job_state' : 'varchar',
46 'euser' : 'varchar', 'qtime' : 'float', 'start_time' : 'float' }
47 def mapatts(indict):
48 sdict = tam.sub_dict(indict,usedkeys.keys())
49 odict = dict()
50 for k in sdict.keys():
51 if k in tam.tfl2 and sdict[k]:
52 secs = tam.tconv(sdict[k])
53 sdict[k] = secs
54 elif k == 'job_state':
55 statelett = sdict[k]
56 if statelett in ['Q','W']:
57 sdict[k] = 'queued'
58 elif statelett in ['R','E']:
59 sdict[k] = 'running'
60 if sdict[k]:
61 odict[k] = sdict[k]
62 return odict
63
64 newjlist = list()
65 for j in jlist:
66 newjlist.append(mapatts(j))
67
68 import sqlite
69 cx = sqlite.connect(":memory:")
70 cu = cx.cursor()
71
72 # build table creation string
73
74 crstr = 'create table jobs ('
75 ctr = 0
76 for k in usedkeys.keys():
77 if ctr > 0 : crstr = crstr + ','
78 crstr = crstr + "%s %s" % (k, usedkeys[k]) ; ctr += 1
79 crstr += ')'
80
81 # create the table
82
83 cu.execute(crstr)
84
85 # insert all the jobs found into the table
86
87 for jd in newjlist:
88
89 # build insert string
90 kl = jd.keys()
91 insstr = 'insert into jobs ('
92 ctr = 0
93 for k in kl:
94 if ctr > 0 : insstr += ','
95 insstr += k
96 ctr += 1
97 insstr += ') values ('
98 ctr = 0
99 for k in kl:
100 if ctr > 0 : insstr += ','
101 insstr += repr(jd[k])
102 ctr += 1
103 insstr += ')'
104
105 cu.execute(insstr)
106 # print insstr
107
108 # find the list of all queued jobs vs. group
109
110 queuedjobs = dict()
111
112 selstr = 'select egroup,count(*) from jobs where job_state="queued" group by egroup'
113 cu.execute(selstr)
114 rl = cu.fetchall()
115 for r in rl:
116 queuedjobs[r[0]] = r[1]
117
118 # build rrdupdate string
119 for group in queuedjobs.keys():
120 cmdstr = "rrdtool update " + DATADIR + group + '.queued.rrd N:' + repr(queuedjobs[group])
121 os.system(cmdstr)
122
123 # what it should look like when python binding works
124
125 # import rrdtool
126 # # build rrdupdate string
127 # for group in queuedjobs.keys():
128 # rrdtool.update(DATADIR + group + '.queued.rrd',
129 # 'N:' + repr(queuedjobs[group]))
130
131 runningjobs = dict()
132 selstr = 'select egroup,count(*) from jobs where job_state="running" group by egroup'
133 cu.execute(selstr)
134 rl = cu.fetchall()
135 for r in rl:
136 runningjobs[r[0]] = r[1]
137
138 # build rrdupdate string
139 for group in runningjobs.keys():
140 cmdstr = "rrdtool update " + DATADIR + group + '.running.rrd N:' + repr(runningjobs[group])
141 os.system(cmdstr)
142
143
144 # print queuedjobs
145 # print queuedjobs.keys()
146 # print queuedjobs.values()
147 # of = open(os.environ['HOME']+'/tmp/groups.rdata2',mode='a')
148 # of.write("%s\n" % (nowstr))
149 # for r in rl:
150 # of.write(repr(r) + '\n')
151 # of.write('\n')
152 # of.close()
153
154 # selstr = 'select egroup,count(*) from jobs where job_state="queued" group by egroup'
155 # cu.execute(selstr)
156 # rl = cu.fetchall()
157 # of = open(os.environ['HOME']+'/tmp/groups.qdata2',mode='a')
158 # of.write("%s\n" % (nowstr))
159 # for r in rl:
160 # of.write(repr(r) + '\n')
161 # of.write('\n')
162 # of.close()
163
164 import time
165 now = time.mktime(time.localtime())
166 nowstr = time.strftime("%b-%d-%Y-%H:%M:%S",time.localtime())
167 for g in grouplist:
168 selstr = 'select qtime from jobs where job_state="queued" and egroup="' + g + '"'
169 cu.execute(selstr)
170 rl = cu.fetchall()
171 if len(rl) > 0:
172 qtl = list()
173 for r in rl:
174 qtl.append(r[0])
175 qtl.sort()
176 waitt = now - qtl[0]
177 else:
178 waitt = 2
179
180 of = open(os.environ['HOME']+'/tmp/'+g+'.wdata2',mode='a')
181 of.write("%s %s %6d\n" % (nowstr, g, waitt))
182 of.close()
183
184 of = open(os.environ['HOME']+'/tmp/'+g+'.qdata2',mode='a')
185 of.write("%s %s %6d\n" % (nowstr, g, len(rl)))
186 of.close()
187
188 selstr = 'select start_time from jobs where job_state="running" and start_time<>"None"'
189 cu.execute(selstr)
190 rl = cu.fetchall()
191 if len(rl) > 0:
192 stl = list()
193 for r in rl:
194 stl.append(r[0])
195
196 stl.sort()
197 lastroll = now - stl[-1]
198 if lastroll < 2:
199 lastroll = 2
200
201 of = open(os.environ['HOME']+'/tmp/core.wdata2',mode='a')
202 of.write("%s %s %6d\n" % (nowstr, 'core', lastroll))
203 of.close()
204
205 selstr = 'select start_time from jobs where job_state="queued"'
206 cu.execute(selstr)
207 rl = cu.fetchall()
208 of = open(os.environ['HOME']+'/tmp/total.qdata2',mode='a')
209 of.write("%s %s %6d\n" % (nowstr, 'total', len(rl)))
210 of.close()
211
212 selstr = 'select egroup,count(*) from jobs where job_state="queued" group by egroup'
213 cu.execute(selstr)
214 rl = cu.fetchall()
215 of = open(os.environ['HOME']+'/tmp/groups.qdata2',mode='a')
216 of.write("%s\n" % (nowstr))
217 for r in rl:
218 of.write(repr(r) + '\n')
219 of.write('\n')
220 of.close()
221
222 ### Local Variables: ***
223 ### mode: python ***
224 ### End: ***
225

Properties

Name Value
svn:executable *

grid.support@nikhef.nl
ViewVC Help
Powered by ViewVC 1.1.28