/[pdpsoft]/nl.nikhef.pdp.lrms-py-generic/trunk/lrms.py
ViewVC logotype

Contents of /nl.nikhef.pdp.lrms-py-generic/trunk/lrms.py

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1971 - (show annotations) (download) (as text)
Tue Sep 28 13:21:21 2010 UTC (12 years ago) by templon
File MIME type: application/x-python
File size: 7253 byte(s)
update svn header keywords

1 #--
2 # lrms.py -- generic LRMS classes (backend independent)
3 # $Id$
4 # Source: $URL$
5 #--
6
7 # Classes representing a generic LRMS (Local Resource Management
8 # System) -- i.e. a batch queue more or less.
9
10 # This defines the main interface to any type of LRMS, to be used
11 # with the start-time prediction stuff (gott and friends). One
12 # would presumably not import this module directly, rather one
13 # would make a derived class that inherits from these (Server)
14 # and contains guts appropriate to fill in the various fields.
15
16 # first cut 8 June 2004, J. Templon, NIKHEF Amsterdam
17
18 class Job:
19
20 # simple class to represent all the information we need about
21 # a job. If a dictionary is passed, it is expected to hold
22 # all the info about the job. If not, an empty job is created,
23 # set using the Job.set() function.
24
25 def __init__(self, attdict=None):
26 if not attdict:
27 self.__info__ = {}
28 else:
29 self.__info__ = attdict
30
31 def set(self,key,val):
32 self.__info__[key] = val
33
34 def get(self,key):
35 if key not in self.__info__.keys():
36 return None
37 else:
38 return self.__info__[key]
39
40 def attDictString(self):
41 return repr(self.__info__)
42
43 def __str__(self):
44 if self.get('jobid'):
45 ostr = 'jobid :: ' + self.get('jobid')
46 else:
47 ostr = 'jobid :: Unknown'
48 if self.get('user'):
49 ostr = ostr + '; user :: ' + self.get('user')
50 if self.get('group'):
51 ostr = ostr + '; group :: ' + self.get('group')
52 if self.get('queue'):
53 ostr = ostr + '; queue :: ' + self.get('queue')
54 if self.get('state'):
55 ostr = ostr + '; state :: ' + self.get('state')
56 return ostr
57
58 def filt2str(d):
59 """
60 Given a dictionary containing selection criterion on which to search the job database,
61 return a string suitable for indexing into cached copies of the selection. To ensure
62 reproducibility, sort the keys, then make key.val.key.val... string.
63 """
64 sk = d.keys()
65 sk.sort()
66 result =""
67 for k in sk:
68 if result != "":
69 result = result + "."
70 result = result + "%s.%s" % (k, d[k])
71 return result
72
73 class Server(object):
74 def __init__(self,schedCycle=26):
75 self.__jobdict__ = { }
76 self.__evtime__ = 0 # set this to be time of current event
77
78 self.__scache__ = {} # caches results of various slices to speed up
79
80 # attributes for 'slots' ... using PBS we don't
81 # necessarily have access to info about physical CPUs.
82
83 # slotsUp means slots that can run, or are running, jobs right now.
84 # so 'Up' in the sense of 'uptime'. slotsFree are those 'up'
85 # slots that don't have jobs assigned to them right now.
86
87 self.__slotsUp__ = -1
88 self.__slotsFree__ = -1
89
90 # note: if your lrms doesn't have the concept of a scheduler cycle, the
91 # appropriate value would be twice the average time it takes from handing a
92 # job to your lrms, until it starts to actually run (assuming the job
93 # doesn't have to wait for a free slot first).
94
95 self.schedCycle = schedCycle
96
97 def jobs(self) :
98 """
99 Return list of Job objects corresponding to all jobs known to
100 the system
101 """
102 return self.__jobdict__.values()
103
104 def matchingJobs(self,filtd):
105 """
106 Return list of Job objects that corresponds to all jobs known to
107 the system that match the attributes provided in dictionary filtd.
108 This function and the later nmatch do essentially the same thing;
109 a future release should rationalize this behavior.
110 """
111 indstr = filt2str(filtd)
112 #DEBUG print 'running matchingJobs: ', indstr
113 if indstr not in self.__scache__.keys():
114 self.__scache__[indstr] = self.mkview(filtd)
115 return self.__scache__[indstr].values()
116
117 def get_slotsUp(self): return self.__slotsUp__
118 def set_slotsUp(self,n): self.__slotsUp__ = n
119 slotsUp = property(get_slotsUp,set_slotsUp,doc="return number of job slots "+\
120 "that are online (up) and controlled by this server")
121
122 def get_slotsFree(self): return self.__slotsFree__
123 def set_slotsFree(self,n): self.__slotsFree__ = n
124 slotsFree = property(get_slotsFree,set_slotsFree,doc="return number of " +\
125 "free job slots")
126
127 def get_snaptime(self): return self.__evtime__
128 def set_snaptime(self, timestamp): self.__evtime__ = timestamp
129 now = property(get_snaptime, set_snaptime, doc="timestamp in seconds " +\
130 "specifying when the state currently in this server " +\
131 "object was captured")
132
133 def mkview(self,filtd):
134 """
135 Construct a dict of all jobs matching criteria in dict 'filtd',
136 and put the result in the search cache
137 """
138 indstr = filt2str(filtd)
139 if indstr in self.__scache__.keys() :
140 print 'blew it somewhere, trying to create a pre-existing cache!'
141 return self.__scache__[indstr]
142
143 reslt = {}
144 for id in self.__jobdict__.keys() :
145 j = self.getjob(id)
146 match = 1
147 for k in filtd.keys():
148 if j.get(k) != filtd[k] :
149 match = 0
150 break
151 if match == 1 :
152 reslt[id] = j
153 return reslt
154
155 # returns number of jobs matching filter criteria
156 # also caches the 'index' string from the query to be backwards
157 # compatible with the original.
158
159 def nmatch(self,filtd={}) :
160 if filtd == {} :
161 self.__lastquery__ = ''
162 return len(self.__jobdict__)
163 else:
164 indstr = filt2str(filtd)
165 self.__lastquery__ = indstr
166 if indstr in self.__scache__.keys():
167 return len(self.__scache__[indstr])
168 else:
169 self.__scache__[indstr] = self.mkview(filtd)
170 return len(self.__scache__[indstr])
171 def jobs_last_query(self):
172 if self.__lastquery__ == '':
173 return self.__jobdict__.values()
174 else:
175 return self.__scache__[self.__lastquery__].values()
176 def jobids_last_query(self):
177 if self.__lastquery__ == '':
178 return self.__jobdict__.keys()
179 else:
180 return self.__scache__[self.__lastquery__].keys()
181
182 def getjobids(self, filter={}):
183 if filter == {} :
184 newlist = self.__jobdict__.keys()
185 return newlist
186 else:
187 newlist = []
188 for j in self.__jobdict__.keys():
189 match = 1
190 for k in filter.keys():
191 if self.__jobdict__[j].get(k) != filter[k] :
192 match = 0
193 if match == 1:
194 newlist.append(j)
195 return newlist
196
197 def getjob(self,jid) :
198 return self.__jobdict__[jid]
199
200 def addjob(self,jid,job) :
201 self.__jobdict__[jid] = job
202
203 def deletejob(self,jid) :
204 del self.__jobdict__[jid]
205

Properties

Name Value
svn:keywords Id URL

grid.support@nikhef.nl
ViewVC Help
Powered by ViewVC 1.1.28