/[pdpsoft]/nl.nikhef.pdp.lrms-py-generic/trunk/lrms.py
ViewVC logotype

Contents of /nl.nikhef.pdp.lrms-py-generic/trunk/lrms.py

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1951 - (show annotations) (download) (as text)
Tue Sep 28 11:35:38 2010 UTC (12 years ago) by templon
Original Path: nl.nikhef.pdp.lrms-py-generic/lrms.py
File MIME type: application/x-python
File size: 7285 byte(s)
First Import
1 #--
2 # lrms.py -- generic LRMS classes (backend independent)
3 # $Id: lrms.py,v 2.0 2007/02/20 14:59:02 templon Exp $
4 #--
5
6 # Classes representing a generic LRMS (Local Resource Management
7 # System) -- i.e. a batch queue more or less.
8
9 # This defines the main interface to any type of LRMS, to be used
10 # with the start-time prediction stuff (gott and friends). One
11 # would presumably not import this module directly, rather one
12 # would make a derived class that inherits from these (Server)
13 # and contains guts appropriate to fill in the various fields.
14
15 # first cut 8 June 2004, J. Templon, NIKHEF Amsterdam
16
17 class Job:
18
19 # simple class to represent all the information we need about
20 # a job. If a dictionary is passed, it is expected to hold
21 # all the info about the job. If not, an empty job is created,
22 # set using the Job.set() function.
23
24 def __init__(self, attdict=None):
25 if not attdict:
26 self.__info__ = {}
27 else:
28 self.__info__ = attdict
29
30 def set(self,key,val):
31 self.__info__[key] = val
32
33 def get(self,key):
34 if key not in self.__info__.keys():
35 return None
36 else:
37 return self.__info__[key]
38
39 def attDictString(self):
40 return repr(self.__info__)
41
42 def __str__(self):
43 if self.get('jobid'):
44 ostr = 'jobid :: ' + self.get('jobid')
45 else:
46 ostr = 'jobid :: Unknown'
47 if self.get('user'):
48 ostr = ostr + '; user :: ' + self.get('user')
49 if self.get('group'):
50 ostr = ostr + '; group :: ' + self.get('group')
51 if self.get('queue'):
52 ostr = ostr + '; queue :: ' + self.get('queue')
53 if self.get('state'):
54 ostr = ostr + '; state :: ' + self.get('state')
55 return ostr
56
57 def filt2str(d):
58 """
59 Given a dictionary containing selection criterion on which to search the job database,
60 return a string suitable for indexing into cached copies of the selection. To ensure
61 reproducibility, sort the keys, then make key.val.key.val... string.
62 """
63 sk = d.keys()
64 sk.sort()
65 result =""
66 for k in sk:
67 if result != "":
68 result = result + "."
69 result = result + "%s.%s" % (k, d[k])
70 return result
71
72 class Server(object):
73 def __init__(self,schedCycle=26):
74 self.__jobdict__ = { }
75 self.__evtime__ = 0 # set this to be time of current event
76
77 self.__scache__ = {} # caches results of various slices to speed up
78
79 # attributes for 'slots' ... using PBS we don't
80 # necessarily have access to info about physical CPUs.
81
82 # slotsUp means slots that can run, or are running, jobs right now.
83 # so 'Up' in the sense of 'uptime'. slotsFree are those 'up'
84 # slots that don't have jobs assigned to them right now.
85
86 self.__slotsUp__ = -1
87 self.__slotsFree__ = -1
88
89 # note: if your lrms doesn't have the concept of a scheduler cycle, the
90 # appropriate value would be twice the average time it takes from handing a
91 # job to your lrms, until it starts to actually run (assuming the job
92 # doesn't have to wait for a free slot first).
93
94 self.schedCycle = schedCycle
95
96 def jobs(self) :
97 """
98 Return list of Job objects corresponding to all jobs known to
99 the system
100 """
101 return self.__jobdict__.values()
102
103 def matchingJobs(self,filtd):
104 """
105 Return list of Job objects that corresponds to all jobs known to
106 the system that match the attributes provided in dictionary filtd.
107 This function and the later nmatch do essentially the same thing;
108 a future release should rationalize this behavior.
109 """
110 indstr = filt2str(filtd)
111 #DEBUG print 'running matchingJobs: ', indstr
112 if indstr not in self.__scache__.keys():
113 self.__scache__[indstr] = self.mkview(filtd)
114 return self.__scache__[indstr].values()
115
116 def get_slotsUp(self): return self.__slotsUp__
117 def set_slotsUp(self,n): self.__slotsUp__ = n
118 slotsUp = property(get_slotsUp,set_slotsUp,doc="return number of job slots "+\
119 "that are online (up) and controlled by this server")
120
121 def get_slotsFree(self): return self.__slotsFree__
122 def set_slotsFree(self,n): self.__slotsFree__ = n
123 slotsFree = property(get_slotsFree,set_slotsFree,doc="return number of " +\
124 "free job slots")
125
126 def get_snaptime(self): return self.__evtime__
127 def set_snaptime(self, timestamp): self.__evtime__ = timestamp
128 now = property(get_snaptime, set_snaptime, doc="timestamp in seconds " +\
129 "specifying when the state currently in this server " +\
130 "object was captured")
131
132 def mkview(self,filtd):
133 """
134 Construct a dict of all jobs matching criteria in dict 'filtd',
135 and put the result in the search cache
136 """
137 indstr = filt2str(filtd)
138 if indstr in self.__scache__.keys() :
139 print 'blew it somewhere, trying to create a pre-existing cache!'
140 return self.__scache__[indstr]
141
142 reslt = {}
143 for id in self.__jobdict__.keys() :
144 j = self.getjob(id)
145 match = 1
146 for k in filtd.keys():
147 if j.get(k) != filtd[k] :
148 match = 0
149 break
150 if match == 1 :
151 reslt[id] = j
152 return reslt
153
154 # returns number of jobs matching filter criteria
155 # also caches the 'index' string from the query to be backwards
156 # compatible with the original.
157
158 def nmatch(self,filtd={}) :
159 if filtd == {} :
160 self.__lastquery__ = ''
161 return len(self.__jobdict__)
162 else:
163 indstr = filt2str(filtd)
164 self.__lastquery__ = indstr
165 if indstr in self.__scache__.keys():
166 return len(self.__scache__[indstr])
167 else:
168 self.__scache__[indstr] = self.mkview(filtd)
169 return len(self.__scache__[indstr])
170 def jobs_last_query(self):
171 if self.__lastquery__ == '':
172 return self.__jobdict__.values()
173 else:
174 return self.__scache__[self.__lastquery__].values()
175 def jobids_last_query(self):
176 if self.__lastquery__ == '':
177 return self.__jobdict__.keys()
178 else:
179 return self.__scache__[self.__lastquery__].keys()
180
181 def getjobids(self, filter={}):
182 if filter == {} :
183 newlist = self.__jobdict__.keys()
184 return newlist
185 else:
186 newlist = []
187 for j in self.__jobdict__.keys():
188 match = 1
189 for k in filter.keys():
190 if self.__jobdict__[j].get(k) != filter[k] :
191 match = 0
192 if match == 1:
193 newlist.append(j)
194 return newlist
195
196 def getjob(self,jid) :
197 return self.__jobdict__[jid]
198
199 def addjob(self,jid,job) :
200 self.__jobdict__[jid] = job
201
202 def deletejob(self,jid) :
203 del self.__jobdict__[jid]
204

grid.support@nikhef.nl
ViewVC Help
Powered by ViewVC 1.1.28