/[pdpsoft]/nl.nikhef.pdp.dynsched-pbs-plugin/trunk/torque_utils.py
ViewVC logotype

Contents of /nl.nikhef.pdp.dynsched-pbs-plugin/trunk/torque_utils.py

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2028 - (show annotations) (download) (as text)
Fri Oct 8 14:50:39 2010 UTC (11 years, 11 months ago) by templon
File MIME type: application/x-python
File size: 8751 byte(s)
add Id and URL fields

1 #!/usr/bin/env python2
2 # $Id$
3 # $URL$
4
5 """
6 python utilities and wrappers for Torque.
7 """
8
9 __version__ = "0.3"
10 __author__ = "Davide Salomoni"
11
12 from __future__ import generators # only needed in Python 2.2
13 import os,re
14
15 # The programs needed by these utilities. If they are not in a location
16 # accessible by PATH, specify their location here.
17 PBSNODES = "pbsnodes"
18 MOMCTL = "/root/momctl"
19 QSTAT = "qstat"
20
21 def pbsnodes(nodes="", ignoreError=False):
22 """
23 Interface to the pbsnodes command. It returns a list of nodes found by PBS.
24 Specify the list of nodes as a comma-separated string.
25
26 Example usage:
27
28 for node in pbsnodes():
29 if node.isUp():
30 print node.name, "average load = ", node.status['loadave']
31 else:
32 print node.name, "is down"
33
34 See the __doc__ string of the nodes returned by pbsnodes() for a list
35 of the attributes and methods supported by the node object returned
36 by pbsnodes.
37
38 The following keyword arguments are supported:
39 ignoreError (True/False): ignore errors in processing individual pbsnodes queries.
40
41 If ignoreError is False and an error is encountered issuing pbsnodes commands, a
42 TorqueError exception will be raised.
43 """
44 if not _pbsnodesOK:
45 raise IOError, "'%s' not found" % PBSNODES
46
47 if not nodes:
48 nodes = [""] # meaning all nodes
49 else:
50 nodes = nodes.split(',')
51
52 result = []
53 for node in nodes:
54 fh = os.popen("%s -a %s" % (PBSNODES,node))
55 s = fh.readlines()
56 exitcode = fh.close()
57 if exitcode:
58 if ignoreError:
59 continue
60 else:
61 raise TorqueError, ''.join(s)
62
63 # A generator-based version would be:
64 # for entry in _paragraphs(os.popen("pbsnodes -a")): yield PBSNode(entry)
65 for entry in _paragraphs(s):
66 result.append(PBSNode(entry))
67
68 return result
69
70 def qstat(jobs=""):
71 if not jobs:
72 jobs = [""] # meaning all jobs
73 else:
74 jobs = jobs.split(',')
75
76 result = []
77 for job in jobs:
78 fh = os.popen("%s -f %s" % (QSTAT,job))
79 s = fh.readlines()
80 exitcode = fh.close()
81 if exitcode:
82 raise TorqueError, ''.join(s)
83
84 for entry in _paragraphs(s):
85 result.append(PBSJob(entry))
86
87 return result
88
89 def mom(node):
90 """
91 Interface to the MOM process on a given node.
92 """
93 if not _momctlOk:
94 raise IOError, "'%s' not found" % MOMCTL
95
96 return MOMNode(node)
97
98 class PBSJob(object):
99 def __init__(self, para):
100 # step1, split on newlines
101 step1 = para.split('\n')
102 self.header = step1[0]
103 step1 = step1[1:] # remove header line
104
105 # step2, join lines broken on \t characters, removing the \t
106 step2 = []
107 for brokenline in step1:
108 line = re.sub('\t+','',brokenline).strip()
109 if line.find(' = ') != -1:
110 step2.append(line) # this is the start of a real entry
111 else:
112 step2[-1] += line # append this line to the previous one
113
114 # now set instance variables, splitting on 'KEY = VALUE'
115 for entry in step2:
116 key,val = entry.split(' = ')
117 self.__dict__[key] = val
118
119 def __str__(self):
120 return self.header
121
122 class PBSNode(object):
123 """
124 The representation of a node as reported by Torque.
125
126 Public attributes:
127 name (node name:string)
128 state (node state:string)
129 ntype (node type:string)
130 np (number of processors:string)
131 properties (node properties:list)
132 jobs (jobs running on node:dictionary)
133 status (node status:dictionary)
134 numCpu (number of CPUs:integer)
135 freeCpu (number of free CPUs:integer)
136
137 Public methods:
138 isUp() -- return 1 if node is up
139 isDown() -- return 1 if node is down
140
141 This class is not meant for direct instantiation.
142 """
143 def __init__(self,para):
144 lines = para.splitlines()
145 params = ['state','np','properties','ntype','jobs','status']
146 self.name = lines[0]
147 for line in lines[1:]:
148 for param in params:
149 m = re.match('^\s+%s = (.*)' % param, line)
150 if m:
151 # params are named "_NAME", where NAME is the pbsnodes attribute
152 self.__dict__['_%s' % param] = m.group(1)
153 continue
154 # make sure all the required params are in __dict__
155 # and set e.g. state = _state (this can be overridden later with properties)
156 for param in params:
157 self.__dict__.setdefault('_%s' % param,"")
158 self.__dict__[param] = self.__dict__['_%s' % param]
159 def __str__(self):
160 return self.name
161 def isUp(self):
162 return (self._state.find('down')==-1 and self._state.find('offline')==-1)
163 def isDown(self):
164 return not self.isUp()
165
166 def _getNumCpu(self):
167 try:
168 return int(self.np)
169 except ValueError:
170 return 0
171 def _getFreeCpu(self):
172 if self.isDown(): return 0
173
174 free = int(self.np)
175 myJobs = self._getJobs()
176 for cpu in range(free):
177 if len(myJobs[cpu]): free = free-1
178
179 return free
180 def _getProperties(self):
181 return [s.strip() for s in self._properties.split(',')]
182 def _getJobs(self):
183 #return [s.strip()[2:] for s in self._jobs.split(',')]
184 d = {}
185 for cpu in range(int(self.np)):
186 d[cpu] = []
187
188 for entry in self._jobs.split(','):
189 try:
190 key,val = entry.split('/')
191 except ValueError:
192 break
193 key,val = int(key.strip()),val.strip()
194 d[key].append(val)
195
196 return d
197 def _getStatus(self):
198 d = {}
199 for entry in self._status.split(','):
200 try:
201 key,val = entry.split('=')
202 except ValueError:
203 break
204 (key,val) = (key.strip(),val.strip())
205 d[key] = val
206 return d
207
208 # class attributes accessed via properties:
209 properties = property(_getProperties)
210 jobs = property(_getJobs)
211 status = property(_getStatus)
212 freeCpu = property(_getFreeCpu)
213 numCpu = property(_getNumCpu)
214
215 class MOMNode(object):
216 """
217 The reprentation of a MOM process as reported by momctl.
218 This class must be instantiated with the node name as parameter.
219
220 Public attributes:
221 name (node name:string)
222 server (torque server name:string)
223 version (torque version:string)
224 jobs (jobs running on node:string)
225 directory (torque directory:string)
226 active(seconds since process active:string)
227 """
228 def __init__(self,node):
229 # horrible parsing due to lack of structure in the output
230 self.jobs = []
231 for line in os.popen("%s -d 0 -h %s" % (MOMCTL,node)):
232 if line.startswith("Host"):
233 m = re.search('Host:\s+(\S+)\s+Server:\s+(\S+)\s+Version:\s+(\S+)',line)
234 self.name = m.group(1)
235 self.server = m.group(2)
236 self.version = m.group(3)
237 elif line.startswith("HomeDirectory"):
238 m = re.search('HomeDirectory:\s+(\S+)',line)
239 self.directory = m.group(1)
240 elif line.startswith("MOM"):
241 m = re.search('MOM active:\s+(.*)',line)
242 self.active = m.group(1)
243 elif line.startswith("Job"):
244 m = re.search('.*\[(.*)\]',line)
245 self.jobs.append(m.group(1))
246
247 class TorqueError(Exception):
248 pass
249
250 def _checkProgram(*args):
251 """
252 Check if the programs passed as argument are found in the current PATH
253 and return a list with corresponding True or False values.
254 """
255
256 ret = []
257 dirnames = os.environ["PATH"].split(os.pathsep)
258 for app in args:
259 for dirname in dirnames:
260 filename = os.path.join(dirname, app)
261 if (os.path.exists(filename) and
262 os.path.isfile(filename) and
263 os.access(filename, os.X_OK)):
264 ret.append(True)
265 break
266 else:
267 ret.append(False)
268
269 return ret
270
271 def _paragraphs(file, separator=None):
272 if not callable(separator):
273 def separator(line): return re.match('^\s+$',line)
274 paragraph = []
275 for line in file:
276 if separator(line):
277 if paragraph:
278 yield ''.join(paragraph)
279 paragraph = []
280 else:
281 paragraph.append(line)
282 if paragraph: yield ''.join(paragraph)
283
284 (_pbsnodesOK, _momctlOK) = _checkProgram(PBSNODES, MOMCTL)

Properties

Name Value
svn:eol-style native
svn:keywords Id URL

grid.support@nikhef.nl
ViewVC Help
Powered by ViewVC 1.1.28