/[pdpsoft]/nl.nikhef.ndpf.groupviews/trunk/wait.py
ViewVC logotype

Contents of /nl.nikhef.ndpf.groupviews/trunk/wait.py

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2348 - (show annotations) (download) (as text)
Wed Jul 20 12:02:36 2011 UTC (11 years, 2 months ago) by templon
File MIME type: application/x-python
File size: 6146 byte(s)
working versions, first import


1 #!/usr/bin/python2
2 # wait.py
3 # fork from lcg-info-dynamic-scheduler
4 # $Id: lcg-info-dynamic-scheduler.cin 2296 2011-05-23 10:12:43Z templon $
5 # Source: $URL: svn+ssh://svn@ndpfsvn.nikhef.nl/repos/pdpsoft/nl.nikhef.pdp.dynsched/trunk/lcg-info-dynamic-scheduler.cin $
6 # J. A. Templon, NIKHEF/PDP 2011
7
8 # purpose: determine longest-waiting job for each of groups present, and append this data to a file
9 # also appends data on core rollover time to a file
10 # input is standard lrmsinfo-generic format
11 # see https://ndpfsvn.nikhef.nl/cgi-bin/viewvc.cgi/pdpsoft/nl.nikhef.pdp.dynsched/trunk/lrmsinfo-generic.txt
12
13 # define mapping of attributes in lrmsinfo-generic output to local
14
15 jobatts = {
16 'id' : { 'source' : 'jobid', 'fmt' : 'varchar(25)' },
17 'ncpu' : { 'source' : 'cpucount', 'fmt' : 'int'},
18 'maxwall' : { 'source' : 'maxwalltime', 'fmt' : 'int'},
19 'name' : { 'source' : 'name', 'fmt' : 'varchar(32)' },
20 'qtime' : { 'source' : 'qtime', 'fmt' : 'int' },
21 'queue' : { 'source' : 'queue', 'fmt' : 'varchar(16)' },
22 'start' : { 'source' : 'start', 'fmt' : 'int' },
23 'anchor' : { 'source' : 'startAnchor', 'fmt' : 'varchar(32)' },
24 'state' : { 'source' : 'state', 'fmt' : 'varchar(8)' },
25 'usr' : { 'source' : 'user', 'fmt' : 'varchar(8)' },
26 'grp' : { 'source' : 'group', 'fmt' : 'varchar(8)' },
27 'wall' : { 'source' : 'walltime', 'fmt': 'int'}
28 }
29
30 # create reverse mapping source->local
31 revmap = dict()
32 for k in jobatts.keys():
33 revmap[jobatts[k]['source']] = k
34
35 def usage():
36 print "Usage: wait.py -g grplist -<o output_dir> infile"
37 print " grplist is like atlas,alice,lhcb"
38 print " default for output_dir is $HOME/tmp"
39
40 def abort_without_output(msg):
41 print msg
42 print "Exiting without output"
43 sys.exit(2)
44
45 import sys
46
47 import getopt
48 import string
49
50 try:
51 opts, args = getopt.getopt(sys.argv[1:], "o:g:",
52 ["output_dir=","group_list="])
53 except getopt.GetoptError:
54 # print help information and exit:
55 emsg = "Error parsing command line: " + string.join(sys.argv)
56 usage()
57 abort_without_output(emsg)
58
59 import os
60 odir = os.environ['HOME'] + '/tmp/'
61 grplist = None
62
63 for o, a in opts:
64 if o in ("-o", "--output_dir"):
65 odir = a
66 elif o in ("-g", "--group_list"):
67 grplist = a.split(",")
68
69 if not grplist:
70 usage()
71 abort_without_output('Please provide a valid group list')
72
73 try:
74 retval = os.stat(odir)
75 except OSError:
76 abort_without_output("Specified output directory does not exist: " + odir)
77
78 def tconv(timeval): # convert time to desired units
79 timeval = min(timeval, 2146060842)
80 return repr( int(timeval) ) # seconds->printable
81
82 if len(args) < 1:
83 abort_without_output("Please specify an input file")
84 else:
85 infh = open(args[0],mode='r')
86
87 lrmlines=infh.readlines()
88
89 import sqlite
90 cx = sqlite.connect(":memory:")
91 cu = cx.cursor()
92
93 # build table creation string
94
95 crstr = 'create table jobs ('
96 ctr = 0
97 for k in jobatts.keys():
98 if ctr > 0 : crstr = crstr + ','
99 crstr = crstr + "%s %s" % (k, jobatts[k]['fmt']) ; ctr += 1
100 crstr += ')'
101
102 cu.execute(crstr)
103
104 for line in lrmlines:
105 s = line.strip()
106 f = s.split()
107 if s[0] == '{' and s[-1] == '}': # looks like a dict
108 jd = eval(s, {"__builtins__" : {}})
109
110 # build insert string
111 kl = jd.keys()
112 insstr = 'insert into jobs ('
113 ctr = 0
114 for k in kl:
115 if ctr > 0 : insstr += ','
116 insstr += revmap[k]
117 ctr += 1
118 insstr += ') values ('
119 ctr = 0
120 for k in kl:
121 if ctr > 0 : insstr += ','
122 insstr += repr(jd[k])
123 ctr += 1
124 insstr += ')'
125
126 cu.execute(insstr)
127
128 elif len(f) == 2:
129 if f[0] == 'nactive' : slotsUp = int(f[1])
130 elif f[0] == 'nfree' : slotsFree = int(f[1])
131 elif f[0] == 'now' : now = int(f[1])
132 elif f[0] == 'schedCycle' : schedCycle = int(f[1])
133 else:
134 logging.warn("invalid input in result of lrms plugin command " + cmd)
135 logging.warn("invalid input: " + s)
136 else:
137 logging.warn("invalid input in result of lrms plugin command " + cmd)
138 logging.warn("invalid input: " + s)
139
140 import time
141 nowstr = time.strftime("%b-%d-%Y-%H:%M:%S",time.localtime(now))
142 for g in grplist:
143 selstr = 'select qtime from jobs where state="queued" and grp="' + g + '"'
144 cu.execute(selstr)
145 rl = cu.fetchall()
146 if len(rl) > 0:
147 qtl = list()
148 for r in rl:
149 qtl.append(r[0])
150 qtl.sort()
151 waitt = now - qtl[0]
152 else:
153 waitt = 2
154
155 of = open(os.environ['HOME']+'/tmp/'+g+'.wdata',mode='a')
156 of.write("%s %s %6d\n" % (nowstr, g, waitt))
157 of.close()
158
159 of = open(os.environ['HOME']+'/tmp/'+g+'.qdata',mode='a')
160 of.write("%s %s %6d\n" % (nowstr, g, len(rl)))
161 of.close()
162
163 selstr = 'select start from jobs where state="running" and start<>"None"'
164 cu.execute(selstr)
165 rl = cu.fetchall()
166 if len(rl) > 0:
167 stl = list()
168 for r in rl:
169 stl.append(r[0])
170
171 stl.sort()
172 lastroll = now - stl[-1]
173 if lastroll < 2:
174 lastroll = 2
175
176 of = open(os.environ['HOME']+'/tmp/core.wdata',mode='a')
177 of.write("%s %s %6d\n" % (nowstr, 'core', lastroll))
178 of.close()
179
180 selstr = 'select start from jobs where state="queued"'
181 cu.execute(selstr)
182 rl = cu.fetchall()
183 of = open(os.environ['HOME']+'/tmp/total.qdata',mode='a')
184 of.write("%s %s %6d\n" % (nowstr, 'total', len(rl)))
185 of.close()
186
187 selstr = 'select grp,count(*) from jobs where state="running" group by grp'
188 cu.execute(selstr)
189 rl = cu.fetchall()
190 of = open(os.environ['HOME']+'/tmp/groups.rdata',mode='a')
191 of.write("%s\n" % (nowstr))
192 for r in rl:
193 of.write(repr(r) + '\n')
194 of.write('\n')
195 of.close()
196
197 selstr = 'select grp,count(*) from jobs where state="queued" group by grp'
198 cu.execute(selstr)
199 rl = cu.fetchall()
200 of = open(os.environ['HOME']+'/tmp/groups.qdata',mode='a')
201 of.write("%s\n" % (nowstr))
202 for r in rl:
203 of.write(repr(r) + '\n')
204 of.write('\n')
205 of.close()
206
207 ### Local Variables: ***
208 ### mode: python ***
209 ### End: ***
210

Properties

Name Value
svn:executable *

grid.support@nikhef.nl
ViewVC Help
Powered by ViewVC 1.1.28