/[pdpsoft]/nl.nikhef.pdp.dynsched/trunk/EstTT.py
ViewVC logotype

Contents of /nl.nikhef.pdp.dynsched/trunk/EstTT.py

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2007 - (show annotations) (download) (as text)
Fri Oct 8 12:50:40 2010 UTC (11 years, 7 months ago) by templon
File MIME type: application/x-python
File size: 10541 byte(s)
added header lines

1 #--
2 # EstTT.py -- class for ESTimating Traversal Times via various
3 # algorithms. Use "strategy" design pattern to handle alg
4 # framework.
5 # J. A. Templon, NIKHEF, PDP Group
6 # $Id$
7 # Source: $URL$
8
9 #--
10
11 # the strategy interface (ie generic algorithm interface):
12 class FindEstimate(object):
13 def algorithm(self, lrms, vo, debug):
14 """
15 lrms == concrete instance of class lrms (snapshot of LRMS state)
16 vo == virtual organization for which you want estimate
17 note: so far this usually maps to unix group of mapped user
18 debug: 0 is don't debug, 1 is to print debugging info
19 """
20 pass
21
22 # put concrete algorithm instances here
23
24 class Gott(FindEstimate):
25 """The original Gott ideas are embodied here. If you want to
26 use the old scheme of looking at jobs matching both the VO
27 AND queue like the old system did, do a call like
28 Estimator.strategy.queue = 'atlas'
29 before actually using the Estimator.estimate() method.
30 """
31 def __init__(self):
32 FindEstimate.__init__(self)
33 self.queue = ''
34 def algorithm(self, lrms, vo, debug):
35 return ett(lrms, self.queue, vo, algorithm='average',debug=debug)
36
37 class WaitTime(FindEstimate):
38 """This is Gott modified to use longest-wait times instead
39 of average wait times. If you want to
40 use the old scheme of looking at jobs matching both the VO
41 AND queue like the old system did, do a call like
42 Estimator.strategy.queue = 'atlas'
43 before actually using the Estimator.estimate() method.
44 """
45 def __init__(self):
46 FindEstimate.__init__(self)
47 self.queue = ''
48 def algorithm(self, lrms, vo, debug):
49 return ett(lrms, self.queue, vo, algorithm='longest',debug=debug)
50
51 class Throughput(FindEstimate):
52 """This class is based on looking at job throughputs and calculating
53 how long it will take to empty all waiting jobs for the desired VO
54 based on the observe througput. Motivation is that Gott & WaitTime
55 do very poorly when the job population is not stable, ie when the
56 rates of job arrival and job starts are not roughly the same.
57 """
58 def __init__(self):
59 FindEstimate.__init__(self)
60 self.twin = 10
61 """
62 Tunable parameter specifying how far back (how many jobs) in the system
63 start history to look when calculating job throughput. IE if twin
64 (Througput WINdow) is 4, code will look at no more than the last four
65 jobs started to see how many jobs per second are being started.
66 """
67 def algorithm(self, lrms, vo, debug):
68
69 # get rid of the silliest case first
70
71 if lrms.slotsUp < 1: # system has no job slots at all
72 return 7777777
73
74 # gather a couple lists to help us decide
75
76 def allrunfunc(job):
77 return job.get('state') == 'running'
78 def vowaitfunc(job):
79 return job.get('state') == 'queued' and job.get('group') == vo
80 allRunningJobs = lrms.matchingJobs(allrunfunc)
81 waitingJobsMyVO = lrms.matchingJobs(vowaitfunc)
82
83 # another very silly case; this VO has waiting jobs, but absolutely no
84 # jobs are running. This is either a short transient that will only last
85 # for one sched cycle, or else something is wrong. Assume the latter.
86
87 if len(allRunningJobs) < 1 and len(waitingJobsMyVO) > 0 :
88 return 7777777
89
90 # another silly case: someone just restarted the server.
91 # symptom (at least on PBS): no free slots, lots of running
92 # jobs, many (or all) of which have no 'start' record.
93
94 # note this is not the only symptom ... there may be free
95 # slots but still no jobs with historical info. Some code
96 # later on to handle this in most_recent_run ...
97
98 if lrms.slotsFree == 0:
99 nr = len(allRunningJobs)
100 nns = 0 # Number with No Start
101 for j in allRunningJobs:
102 if j.get('start') == None: nns = nns + 1
103 if float(nns)/nr >= 0.5:
104 return 7777777
105
106 nw = len(waitingJobsMyVO)
107 if nw < 0:
108 print "found negative number of queued jobs!" ; sys.exit(1)
109 elif nw == 0:
110 if lrms.slotsFree > 0:
111 return lrms.schedCycle / 2
112 else:
113 if len(allRunningJobs) == 0:
114 return lrms.schedCycle / 2
115 else:
116 most_recent = allRunningJobs[0]
117 startList = [ ]
118 for j in allRunningJobs:
119 tval = j.get('start')
120 if tval: # jobs started during last poll cycle won't have 'start'
121 startList.append(tval)
122 else:
123 startList.append(lrms.schedCycle / 2) # rough approx
124 startList.sort() # last element will be most recent start
125 time_sys_full = lrms.now - startList[-1]
126 return time_sys_full + lrms.schedCycle/2
127 else: # there are waiting jobs
128
129 # first check one special case: the one where *all* waiting jobs
130 # have been in the system for less than the schedule cycle
131 # if there are enough slots to run them all, skip the throughput
132 # calculation.
133
134 waitList = [ ]
135 for j in waitingJobsMyVO:
136 waitList.append(lrms.now - j.get('qtime'))
137 waitList.sort()
138 if waitList[-1] < 1.1*lrms.schedCycle \
139 and len(waitList) < lrms.slotsFree:
140 return lrms.schedCycle / 2
141
142 # calcs based on throughput. we know since we checked above that at
143 # least *some* jobs are running. Use jobs from our VO if we have
144 # them, otherwise use list of all running jobs.
145
146 def vorunfunc(job):
147 return job.get('state') == 'running' and job.get('group') == vo
148 runningJobsMyVO = lrms.matchingJobs(vorunfunc)
149 if len(runningJobsMyVO) > 0:
150 runlist = runningJobsMyVO
151 else:
152 runlist = allRunningJobs
153 startList = [ ]
154 for j in runlist:
155 st = j.get('start')
156 if st :
157 startList.append(st)
158 else:
159 startList.append(lrms.now - lrms.schedCycle/2)
160 startList.sort()
161
162 # decide how many jobs to use to calc throughput; take minimum
163 # of number running, number waiting, and the 'throughput window'
164 # that can be set on the algorithm. Note we have to use the
165 # startList instead of runlist since there may be starts for
166 # which we have no records yet (PBS caching).
167
168 num = min(nw,len(startList),self.twin)
169 period = lrms.now - startList[-num]
170 thruput = float(num)/period # jobs per sec
171
172 if len(runningJobsMyVO) > 0:
173 return int((nw+1) / thruput)
174 else:
175 # jobs are running but not yours, so assume you need to finish all
176 # waiting jobs before your new one starts
177 naw = len(lrms.jobs()) - len(runlist) # counts all waiting jobs
178 return int((naw+1) / thruput)
179
180 # the "context" controls the strategy:
181 class TTEstimator(object):
182 def __init__(self, strategy):
183 self.strategy = strategy
184 def estimate(self, lrms, vo, debug):
185 return self.strategy.algorithm(lrms, vo, debug)
186
187 # there are several algorithms for what to do with queued jobs.
188 # define them here and provide means to select between them.
189
190 def ett_longest_queue_time(server,list) :
191 est = 7777777
192 qtlist = [ ]
193
194 for j in list:
195 if j.get('state') == 'queued' :
196 qtlist.append(server.now - j.get('qtime'))
197 qtlist.sort()
198 est = qtlist[-1] # sorted list so this is the longest waiting.
199
200 return est
201
202 def ett_avg_queue_time(server,list) :
203 est = 7777777
204 # find average time in queue of all queued jobs
205 count = 0
206 sum = 0.0
207 for j in list:
208 if j.get('state') == 'queued' :
209 count = count + 1
210 sum = sum + (server.now - j.get('qtime'))
211 avgtime = sum / count
212 # avg time in queue likely to be half of expected time in queue
213 # => return twice avg as estimate.
214 est = int(2*avgtime)
215
216 return est
217
218 # make dict of functions to allow run-time selection
219 _ALGS = {
220 'average' : ett_avg_queue_time,
221 'longest' : ett_longest_queue_time }
222
223 # adjusted_ett provides a central place to make any changes to
224 # return values; right now it just puts a lower limit of 5 on
225 # the lower limit. Could be used e.g. to change units.
226 def adjusted_ett(rawval,period):
227 if rawval < period:
228 return int(period / 2.)
229 else:
230 return int(rawval)
231
232 def ett(server,queue,voname='',algorithm='longest',debug=0) :
233
234 if debug: print 'running for VO', voname
235
236 jfilt = {'state' : 'queued'}
237
238 if queue != '': jfilt['queue'] = queue
239 if voname not in ['', None]: jfilt['group'] = voname
240
241 jobsrunning = {'state': 'running'}
242
243 nq = server.nmatch(jfilt)
244 if debug: print 'found', nq, 'queued jobs in queue \'' + queue + \
245 '\' for VO', voname
246 if nq > 0: # there are queued jobs for this VO
247 est = _ALGS[algorithm](server,server.jobs_last_query())
248 elif server.slotsFree > 0 :
249 if debug: print 'found', server.slotsFree, 'free job slots'
250 est = 0
251 else :
252 server.nmatch(jobsrunning) # get all running jobs
253 est = ett_most_recent_run(server.now,
254 server.jobs_last_query(),
255 debug)
256
257 return adjusted_ett(est,server.schedCycle)
258
259 def ett_most_recent_run(now,joblist,debug=0) :
260
261 # find most recently run job
262 # make list of start times
263
264 starts = list()
265 for j in joblist:
266 st = j.get('start')
267 if st :
268 starts.append(st)
269
270 if len(starts) > 0:
271 starts.sort()
272 last_started = starts[-1] # last in list - start with largest timestamp
273 if debug : print now, last_started
274 timefilled = now - last_started
275 if debug : print 'most recent run', timefilled
276 return timefilled
277 else:
278 return 777777 # error code for 'i don't know'

Properties

Name Value
svn:eol-style native
svn:keywords Id URL

grid.support@nikhef.nl
ViewVC Help
Powered by ViewVC 1.1.28