/[pdpsoft]/nl.nikhef.pdp.dynsched/trunk/EstTT.py
ViewVC logotype

Contents of /nl.nikhef.pdp.dynsched/trunk/EstTT.py

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1999 - (show annotations) (download) (as text)
Fri Oct 8 12:18:22 2010 UTC (11 years, 7 months ago) by templon
File MIME type: application/x-python
File size: 10573 byte(s)
still transferring stuff over from old tree

1 #--
2 # EstTT.py -- class for ESTimating Traversal Times via various
3 # algorithms. Use "strategy" design pattern to handle alg
4 # framework.
5 # J. A. Templon, NIKHEF, PDP Group
6 # $Id: EstTT.py,v 1.7 2005/12/02 15:00:41 templon Exp $
7 #--
8
9 # the strategy interface (ie generic algorithm interface):
10 class FindEstimate(object):
11 def algorithm(self, lrms, vo, debug):
12 """
13 lrms == concrete instance of class lrms (snapshot of LRMS state)
14 vo == virtual organization for which you want estimate
15 note: so far this usually maps to unix group of mapped user
16 debug: 0 is don't debug, 1 is to print debugging info
17 """
18 pass
19
20 # put concrete algorithm instances here
21
22 class Gott(FindEstimate):
23 """The original Gott ideas are embodied here. If you want to
24 use the old scheme of looking at jobs matching both the VO
25 AND queue like the old system did, do a call like
26 Estimator.strategy.queue = 'atlas'
27 before actually using the Estimator.estimate() method.
28 """
29 def __init__(self):
30 FindEstimate.__init__(self)
31 self.queue = ''
32 def algorithm(self, lrms, vo, debug):
33 return ett(lrms, self.queue, vo, algorithm='average',debug=debug)
34
35 class WaitTime(FindEstimate):
36 """This is Gott modified to use longest-wait times instead
37 of average wait times. If you want to
38 use the old scheme of looking at jobs matching both the VO
39 AND queue like the old system did, do a call like
40 Estimator.strategy.queue = 'atlas'
41 before actually using the Estimator.estimate() method.
42 """
43 def __init__(self):
44 FindEstimate.__init__(self)
45 self.queue = ''
46 def algorithm(self, lrms, vo, debug):
47 return ett(lrms, self.queue, vo, algorithm='longest',debug=debug)
48
49 class Throughput(FindEstimate):
50 """This class is based on looking at job throughputs and calculating
51 how long it will take to empty all waiting jobs for the desired VO
52 based on the observe througput. Motivation is that Gott & WaitTime
53 do very poorly when the job population is not stable, ie when the
54 rates of job arrival and job starts are not roughly the same.
55 """
56 def __init__(self):
57 FindEstimate.__init__(self)
58 self.twin = 10
59 """
60 Tunable parameter specifying how far back (how many jobs) in the system
61 start history to look when calculating job throughput. IE if twin
62 (Througput WINdow) is 4, code will look at no more than the last four
63 jobs started to see how many jobs per second are being started.
64 """
65 def algorithm(self, lrms, vo, debug):
66
67 # get rid of the silliest case first
68
69 if lrms.slotsUp < 1: # system has no job slots at all
70 return 7777777
71
72 # gather a couple lists to help us decide
73
74 def allrunfunc(job):
75 return job.get('state') == 'running'
76 def vowaitfunc(job):
77 return job.get('state') == 'queued' and job.get('group') == vo
78 allRunningJobs = lrms.matchingJobs(allrunfunc)
79 waitingJobsMyVO = lrms.matchingJobs(vowaitfunc)
80
81 # another very silly case; this VO has waiting jobs, but absolutely no
82 # jobs are running. This is either a short transient that will only last
83 # for one sched cycle, or else something is wrong. Assume the latter.
84
85 if len(allRunningJobs) < 1 and len(waitingJobsMyVO) > 0 :
86 return 7777777
87
88 # another silly case: someone just restarted the server.
89 # symptom (at least on PBS): no free slots, lots of running
90 # jobs, many (or all) of which have no 'start' record.
91
92 # note this is not the only symptom ... there may be free
93 # slots but still no jobs with historical info. Some code
94 # later on to handle this in most_recent_run ...
95
96 if lrms.slotsFree == 0:
97 nr = len(allRunningJobs)
98 nns = 0 # Number with No Start
99 for j in allRunningJobs:
100 if j.get('start') == None: nns = nns + 1
101 if float(nns)/nr >= 0.5:
102 return 7777777
103
104 nw = len(waitingJobsMyVO)
105 if nw < 0:
106 print "found negative number of queued jobs!" ; sys.exit(1)
107 elif nw == 0:
108 if lrms.slotsFree > 0:
109 return lrms.schedCycle / 2
110 else:
111 if len(allRunningJobs) == 0:
112 return lrms.schedCycle / 2
113 else:
114 most_recent = allRunningJobs[0]
115 startList = [ ]
116 for j in allRunningJobs:
117 tval = j.get('start')
118 if tval: # jobs started during last poll cycle won't have 'start'
119 startList.append(tval)
120 else:
121 startList.append(lrms.schedCycle / 2) # rough approx
122 startList.sort() # last element will be most recent start
123 time_sys_full = lrms.now - startList[-1]
124 return time_sys_full + lrms.schedCycle/2
125 else: # there are waiting jobs
126
127 # first check one special case: the one where *all* waiting jobs
128 # have been in the system for less than the schedule cycle
129 # if there are enough slots to run them all, skip the throughput
130 # calculation.
131
132 waitList = [ ]
133 for j in waitingJobsMyVO:
134 waitList.append(lrms.now - j.get('qtime'))
135 waitList.sort()
136 if waitList[-1] < 1.1*lrms.schedCycle \
137 and len(waitList) < lrms.slotsFree:
138 return lrms.schedCycle / 2
139
140 # calcs based on throughput. we know since we checked above that at
141 # least *some* jobs are running. Use jobs from our VO if we have
142 # them, otherwise use list of all running jobs.
143
144 def vorunfunc(job):
145 return job.get('state') == 'running' and job.get('group') == vo
146 runningJobsMyVO = lrms.matchingJobs(vorunfunc)
147 if len(runningJobsMyVO) > 0:
148 runlist = runningJobsMyVO
149 else:
150 runlist = allRunningJobs
151 startList = [ ]
152 for j in runlist:
153 st = j.get('start')
154 if st :
155 startList.append(st)
156 else:
157 startList.append(lrms.now - lrms.schedCycle/2)
158 startList.sort()
159
160 # decide how many jobs to use to calc throughput; take minimum
161 # of number running, number waiting, and the 'throughput window'
162 # that can be set on the algorithm. Note we have to use the
163 # startList instead of runlist since there may be starts for
164 # which we have no records yet (PBS caching).
165
166 num = min(nw,len(startList),self.twin)
167 period = lrms.now - startList[-num]
168 thruput = float(num)/period # jobs per sec
169
170 if len(runningJobsMyVO) > 0:
171 return int((nw+1) / thruput)
172 else:
173 # jobs are running but not yours, so assume you need to finish all
174 # waiting jobs before your new one starts
175 naw = len(lrms.jobs()) - len(runlist) # counts all waiting jobs
176 return int((naw+1) / thruput)
177
178 # the "context" controls the strategy:
179 class TTEstimator(object):
180 def __init__(self, strategy):
181 self.strategy = strategy
182 def estimate(self, lrms, vo, debug):
183 return self.strategy.algorithm(lrms, vo, debug)
184
185 # there are several algorithms for what to do with queued jobs.
186 # define them here and provide means to select between them.
187
188 def ett_longest_queue_time(server,list) :
189 est = 7777777
190 qtlist = [ ]
191
192 for j in list:
193 if j.get('state') == 'queued' :
194 qtlist.append(server.now - j.get('qtime'))
195 qtlist.sort()
196 est = qtlist[-1] # sorted list so this is the longest waiting.
197
198 return est
199
200 def ett_avg_queue_time(server,list) :
201 est = 7777777
202 # find average time in queue of all queued jobs
203 count = 0
204 sum = 0.0
205 for j in list:
206 if j.get('state') == 'queued' :
207 count = count + 1
208 sum = sum + (server.now - j.get('qtime'))
209 avgtime = sum / count
210 # avg time in queue likely to be half of expected time in queue
211 # => return twice avg as estimate.
212 est = int(2*avgtime)
213
214 return est
215
216 # make dict of functions to allow run-time selection
217 _ALGS = {
218 'average' : ett_avg_queue_time,
219 'longest' : ett_longest_queue_time }
220
221 # adjusted_ett provides a central place to make any changes to
222 # return values; right now it just puts a lower limit of 5 on
223 # the lower limit. Could be used e.g. to change units.
224 def adjusted_ett(rawval,period):
225 if rawval < period:
226 return int(period / 2.)
227 else:
228 return int(rawval)
229
230 def ett(server,queue,voname='',algorithm='longest',debug=0) :
231
232 if debug: print 'running for VO', voname
233
234 jfilt = {'state' : 'queued'}
235
236 if queue != '': jfilt['queue'] = queue
237 if voname not in ['', None]: jfilt['group'] = voname
238
239 jobsrunning = {'state': 'running'}
240
241 nq = server.nmatch(jfilt)
242 if debug: print 'found', nq, 'queued jobs in queue \'' + queue + \
243 '\' for VO', voname
244 if nq > 0: # there are queued jobs for this VO
245 est = _ALGS[algorithm](server,server.jobs_last_query())
246 elif server.slotsFree > 0 :
247 if debug: print 'found', server.slotsFree, 'free job slots'
248 est = 0
249 else :
250 server.nmatch(jobsrunning) # get all running jobs
251 est = ett_most_recent_run(server.now,
252 server.jobs_last_query(),
253 debug)
254
255 return adjusted_ett(est,server.schedCycle)
256
257 def ett_most_recent_run(now,joblist,debug=0) :
258
259 # find most recently run job
260 # make list of start times
261
262 starts = list()
263 for j in joblist:
264 st = j.get('start')
265 if st :
266 starts.append(st)
267
268 if len(starts) > 0:
269 starts.sort()
270 last_started = starts[-1] # last in list - start with largest timestamp
271 if debug : print now, last_started
272 timefilled = now - last_started
273 if debug : print 'most recent run', timefilled
274 return timefilled
275 else:
276 return 777777 # error code for 'i don't know'

Properties

Name Value
svn:eol-style native

grid.support@nikhef.nl
ViewVC Help
Powered by ViewVC 1.1.28