/[pdpsoft]/nl.nikhef.ndpf.groupviews/tags/REL-1.0.0/wait.py
ViewVC logotype

Annotation of /nl.nikhef.ndpf.groupviews/tags/REL-1.0.0/wait.py

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2349 - (hide annotations) (download) (as text)
Thu Jul 21 09:51:43 2011 UTC (11 years, 2 months ago) by templon
File MIME type: application/x-python
File size: 6146 byte(s)
Tag Release 1.0
1 templon 2348 #!/usr/bin/python2
2     # wait.py
3     # fork from lcg-info-dynamic-scheduler
4     # $Id: lcg-info-dynamic-scheduler.cin 2296 2011-05-23 10:12:43Z templon $
5     # Source: $URL: svn+ssh://svn@ndpfsvn.nikhef.nl/repos/pdpsoft/nl.nikhef.pdp.dynsched/trunk/lcg-info-dynamic-scheduler.cin $
6     # J. A. Templon, NIKHEF/PDP 2011
7    
8     # purpose: determine longest-waiting job for each of groups present, and append this data to a file
9     # also appends data on core rollover time to a file
10     # input is standard lrmsinfo-generic format
11     # see https://ndpfsvn.nikhef.nl/cgi-bin/viewvc.cgi/pdpsoft/nl.nikhef.pdp.dynsched/trunk/lrmsinfo-generic.txt
12    
13     # define mapping of attributes in lrmsinfo-generic output to local
14    
15     jobatts = {
16     'id' : { 'source' : 'jobid', 'fmt' : 'varchar(25)' },
17     'ncpu' : { 'source' : 'cpucount', 'fmt' : 'int'},
18     'maxwall' : { 'source' : 'maxwalltime', 'fmt' : 'int'},
19     'name' : { 'source' : 'name', 'fmt' : 'varchar(32)' },
20     'qtime' : { 'source' : 'qtime', 'fmt' : 'int' },
21     'queue' : { 'source' : 'queue', 'fmt' : 'varchar(16)' },
22     'start' : { 'source' : 'start', 'fmt' : 'int' },
23     'anchor' : { 'source' : 'startAnchor', 'fmt' : 'varchar(32)' },
24     'state' : { 'source' : 'state', 'fmt' : 'varchar(8)' },
25     'usr' : { 'source' : 'user', 'fmt' : 'varchar(8)' },
26     'grp' : { 'source' : 'group', 'fmt' : 'varchar(8)' },
27     'wall' : { 'source' : 'walltime', 'fmt': 'int'}
28     }
29    
30     # create reverse mapping source->local
31     revmap = dict()
32     for k in jobatts.keys():
33     revmap[jobatts[k]['source']] = k
34    
35     def usage():
36     print "Usage: wait.py -g grplist -<o output_dir> infile"
37     print " grplist is like atlas,alice,lhcb"
38     print " default for output_dir is $HOME/tmp"
39    
40     def abort_without_output(msg):
41     print msg
42     print "Exiting without output"
43     sys.exit(2)
44    
45     import sys
46    
47     import getopt
48     import string
49    
50     try:
51     opts, args = getopt.getopt(sys.argv[1:], "o:g:",
52     ["output_dir=","group_list="])
53     except getopt.GetoptError:
54     # print help information and exit:
55     emsg = "Error parsing command line: " + string.join(sys.argv)
56     usage()
57     abort_without_output(emsg)
58    
59     import os
60     odir = os.environ['HOME'] + '/tmp/'
61     grplist = None
62    
63     for o, a in opts:
64     if o in ("-o", "--output_dir"):
65     odir = a
66     elif o in ("-g", "--group_list"):
67     grplist = a.split(",")
68    
69     if not grplist:
70     usage()
71     abort_without_output('Please provide a valid group list')
72    
73     try:
74     retval = os.stat(odir)
75     except OSError:
76     abort_without_output("Specified output directory does not exist: " + odir)
77    
78     def tconv(timeval): # convert time to desired units
79     timeval = min(timeval, 2146060842)
80     return repr( int(timeval) ) # seconds->printable
81    
82     if len(args) < 1:
83     abort_without_output("Please specify an input file")
84     else:
85     infh = open(args[0],mode='r')
86    
87     lrmlines=infh.readlines()
88    
89     import sqlite
90     cx = sqlite.connect(":memory:")
91     cu = cx.cursor()
92    
93     # build table creation string
94    
95     crstr = 'create table jobs ('
96     ctr = 0
97     for k in jobatts.keys():
98     if ctr > 0 : crstr = crstr + ','
99     crstr = crstr + "%s %s" % (k, jobatts[k]['fmt']) ; ctr += 1
100     crstr += ')'
101    
102     cu.execute(crstr)
103    
104     for line in lrmlines:
105     s = line.strip()
106     f = s.split()
107     if s[0] == '{' and s[-1] == '}': # looks like a dict
108     jd = eval(s, {"__builtins__" : {}})
109    
110     # build insert string
111     kl = jd.keys()
112     insstr = 'insert into jobs ('
113     ctr = 0
114     for k in kl:
115     if ctr > 0 : insstr += ','
116     insstr += revmap[k]
117     ctr += 1
118     insstr += ') values ('
119     ctr = 0
120     for k in kl:
121     if ctr > 0 : insstr += ','
122     insstr += repr(jd[k])
123     ctr += 1
124     insstr += ')'
125    
126     cu.execute(insstr)
127    
128     elif len(f) == 2:
129     if f[0] == 'nactive' : slotsUp = int(f[1])
130     elif f[0] == 'nfree' : slotsFree = int(f[1])
131     elif f[0] == 'now' : now = int(f[1])
132     elif f[0] == 'schedCycle' : schedCycle = int(f[1])
133     else:
134     logging.warn("invalid input in result of lrms plugin command " + cmd)
135     logging.warn("invalid input: " + s)
136     else:
137     logging.warn("invalid input in result of lrms plugin command " + cmd)
138     logging.warn("invalid input: " + s)
139    
140     import time
141     nowstr = time.strftime("%b-%d-%Y-%H:%M:%S",time.localtime(now))
142     for g in grplist:
143     selstr = 'select qtime from jobs where state="queued" and grp="' + g + '"'
144     cu.execute(selstr)
145     rl = cu.fetchall()
146     if len(rl) > 0:
147     qtl = list()
148     for r in rl:
149     qtl.append(r[0])
150     qtl.sort()
151     waitt = now - qtl[0]
152     else:
153     waitt = 2
154    
155     of = open(os.environ['HOME']+'/tmp/'+g+'.wdata',mode='a')
156     of.write("%s %s %6d\n" % (nowstr, g, waitt))
157     of.close()
158    
159     of = open(os.environ['HOME']+'/tmp/'+g+'.qdata',mode='a')
160     of.write("%s %s %6d\n" % (nowstr, g, len(rl)))
161     of.close()
162    
163     selstr = 'select start from jobs where state="running" and start<>"None"'
164     cu.execute(selstr)
165     rl = cu.fetchall()
166     if len(rl) > 0:
167     stl = list()
168     for r in rl:
169     stl.append(r[0])
170    
171     stl.sort()
172     lastroll = now - stl[-1]
173     if lastroll < 2:
174     lastroll = 2
175    
176     of = open(os.environ['HOME']+'/tmp/core.wdata',mode='a')
177     of.write("%s %s %6d\n" % (nowstr, 'core', lastroll))
178     of.close()
179    
180     selstr = 'select start from jobs where state="queued"'
181     cu.execute(selstr)
182     rl = cu.fetchall()
183     of = open(os.environ['HOME']+'/tmp/total.qdata',mode='a')
184     of.write("%s %s %6d\n" % (nowstr, 'total', len(rl)))
185     of.close()
186    
187     selstr = 'select grp,count(*) from jobs where state="running" group by grp'
188     cu.execute(selstr)
189     rl = cu.fetchall()
190     of = open(os.environ['HOME']+'/tmp/groups.rdata',mode='a')
191     of.write("%s\n" % (nowstr))
192     for r in rl:
193     of.write(repr(r) + '\n')
194     of.write('\n')
195     of.close()
196    
197     selstr = 'select grp,count(*) from jobs where state="queued" group by grp'
198     cu.execute(selstr)
199     rl = cu.fetchall()
200     of = open(os.environ['HOME']+'/tmp/groups.qdata',mode='a')
201     of.write("%s\n" % (nowstr))
202     for r in rl:
203     of.write(repr(r) + '\n')
204     of.write('\n')
205     of.close()
206    
207     ### Local Variables: ***
208     ### mode: python ***
209     ### End: ***
210    

Properties

Name Value
svn:executable *

grid.support@nikhef.nl
ViewVC Help
Powered by ViewVC 1.1.28