/[pdpsoft]/nl.nikhef.ndpf.groupviews/branches/RB-2.1.0/ndpf-gv-dbupdate
ViewVC logotype

Contents of /nl.nikhef.ndpf.groupviews/branches/RB-2.1.0/ndpf-gv-dbupdate

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2403 - (show annotations) (download)
Tue Aug 9 12:00:56 2011 UTC (11 years, 2 months ago) by templon
File size: 4443 byte(s)
create release branch 2.1
1 #!/usr/bin/python2
2 # $Id$
3 # Source: $URL$
4 # J. A. Templon, NIKHEF/PDP 2011
5
6 # purpose: update rrd databases for ndpf group views
7 # program runs qstat, parses the output to find number of waiting and running jobs per group
8 # this is added to an RRD database
9
10 import os
11 import tempfile
12 qsfname = tempfile.mktemp(".txt","qsmf",os.environ['HOME']+"/tmp")
13
14 TORQUE="stro.nikhef.nl"
15 DATADIR=os.environ['HOME'] + '/ndpfdata/'
16 WAITDB = DATADIR + 'ndpfwaiting.rrd'
17 RUNDB = DATADIR + 'ndpfrunning.rrd'
18
19 os.system('/usr/bin/qstat -f @' + TORQUE + ' > ' + qsfname)
20
21 import torqueJobs
22 jlist = torqueJobs.qs_parsefile(qsfname)
23
24 os.system('mv ' + qsfname + ' ' + os.environ['HOME'] + '/tmp/qstat.last.txt')
25
26 import torqueAttMappers as tam
27 usedkeys = {'egroup' : 'varchar', 'jobid' : 'varchar', 'queue' : 'varchar', 'job_state' : 'varchar',
28 'euser' : 'varchar', 'qtime' : 'float', 'start_time' : 'float' }
29 def mapatts(indict):
30 sdict = tam.sub_dict(indict,usedkeys.keys())
31 odict = dict()
32 for k in sdict.keys():
33 if k in tam.tfl2 and sdict[k]:
34 secs = tam.tconv(sdict[k])
35 sdict[k] = secs
36 elif k == 'job_state':
37 statelett = sdict[k]
38 if statelett in ['Q','W']:
39 sdict[k] = 'queued'
40 elif statelett in ['R','E']:
41 sdict[k] = 'running'
42 if sdict[k]:
43 odict[k] = sdict[k]
44 return odict
45
46 newjlist = list()
47 for j in jlist:
48 newjlist.append(mapatts(j))
49
50 import sqlite
51 cx = sqlite.connect(":memory:")
52 cu = cx.cursor()
53
54 # build table creation string
55
56 crstr = 'create table jobs ('
57 ctr = 0
58 for k in usedkeys.keys():
59 if ctr > 0 : crstr = crstr + ','
60 crstr = crstr + "%s %s" % (k, usedkeys[k]) ; ctr += 1
61 crstr += ')'
62
63 # create the table
64
65 cu.execute(crstr)
66
67 # insert all the jobs found into the table
68
69 for jd in newjlist:
70
71 # build insert string
72 kl = jd.keys()
73 insstr = 'insert into jobs ('
74 ctr = 0
75 for k in kl:
76 if ctr > 0 : insstr += ','
77 insstr += k
78 ctr += 1
79 insstr += ') values ('
80 ctr = 0
81 for k in kl:
82 if ctr > 0 : insstr += ','
83 insstr += repr(jd[k])
84 ctr += 1
85 insstr += ')'
86
87 cu.execute(insstr)
88 # print insstr
89
90 # find the list of all queued jobs vs. group
91
92 queuedjobs = dict()
93
94 selstr = 'select egroup,count(*) from jobs where job_state="queued" group by egroup'
95 cu.execute(selstr)
96 rl = cu.fetchall()
97 for r in rl:
98 queuedjobs[r[0]] = r[1]
99
100 qtot = sum(queuedjobs.values())
101 queuedjobs['total'] = qtot
102
103 import rrdtool
104 # loop over all known databases and update; with val if known, zero otherwise.
105 import glob
106 queued_files = glob.glob(DATADIR+'*.queued.rrd')
107 for db in queued_files:
108 group = db[len(DATADIR):db.find('.queued.rrd')]
109 if group in queuedjobs.keys():
110 val = queuedjobs[group]
111 else:
112 val = 0
113 rrdtool.update(db, 'N:' + repr(val))
114
115 # do it again for the running jobs
116
117 runningjobs = dict()
118 selstr = 'select egroup,count(*) from jobs where job_state="running" group by egroup'
119 cu.execute(selstr)
120 rl = cu.fetchall()
121 for r in rl:
122 runningjobs[r[0]] = r[1]
123
124 rtot = sum(runningjobs.values())
125 runningjobs['total'] = rtot
126
127 running_files = glob.glob(DATADIR+'*.running.rrd')
128 for db in running_files:
129 group = db[len(DATADIR):db.find('.running.rrd')]
130 if group in runningjobs.keys():
131 val = runningjobs[group]
132 else:
133 val = 0
134 rrdtool.update(db, 'N:' + repr(val))
135
136 import time
137 now = time.mktime(time.localtime())
138
139 # now do longest wait times
140
141 for group in queuedjobs.keys():
142 selstr = 'select qtime from jobs where job_state="queued" ' + \
143 'and egroup="' + group + '"'
144 cu.execute(selstr)
145 rl = cu.fetchall()
146 if len(rl) > 0:
147 qtl = list()
148 for r in rl:
149 qtl.append(r[0])
150 qtl.sort()
151 waitt = now - qtl[0]
152 else:
153 waitt = 2
154
155 rrdtool.update(DATADIR + group + '.waittime.rrd',
156 'N:' + repr(int(waitt)))
157
158 selstr = 'select start_time from jobs where job_state="running" and start_time<>"None"'
159 cu.execute(selstr)
160 rl = cu.fetchall()
161 if len(rl) > 0:
162 stl = list()
163 for r in rl:
164 stl.append(r[0])
165
166 stl.sort()
167 lastroll = now - stl[-1]
168 if lastroll < 2:
169 lastroll = 2
170
171 rrdtool.update(DATADIR + "rollover" + '.waittime.rrd',
172 'N:' + repr(int(lastroll)))
173
174 ### Local Variables: ***
175 ### mode: python ***
176 ### End: ***
177

Properties

Name Value
svn:executable *
svn:keywords Id URL

grid.support@nikhef.nl
ViewVC Help
Powered by ViewVC 1.1.28