1 |
templon |
1999 |
#-- |
2 |
|
|
# EstTT.py -- class for ESTimating Traversal Times via various |
3 |
|
|
# algorithms. Use "strategy" design pattern to handle alg |
4 |
|
|
# framework. |
5 |
|
|
# J. A. Templon, NIKHEF, PDP Group |
6 |
|
|
# $Id: EstTT.py,v 1.7 2005/12/02 15:00:41 templon Exp $ |
7 |
|
|
#-- |
8 |
|
|
|
9 |
|
|
# the strategy interface (ie generic algorithm interface): |
10 |
|
|
class FindEstimate(object): |
11 |
|
|
def algorithm(self, lrms, vo, debug): |
12 |
|
|
""" |
13 |
|
|
lrms == concrete instance of class lrms (snapshot of LRMS state) |
14 |
|
|
vo == virtual organization for which you want estimate |
15 |
|
|
note: so far this usually maps to unix group of mapped user |
16 |
|
|
debug: 0 is don't debug, 1 is to print debugging info |
17 |
|
|
""" |
18 |
|
|
pass |
19 |
|
|
|
20 |
|
|
# put concrete algorithm instances here |
21 |
|
|
|
22 |
|
|
class Gott(FindEstimate): |
23 |
|
|
"""The original Gott ideas are embodied here. If you want to |
24 |
|
|
use the old scheme of looking at jobs matching both the VO |
25 |
|
|
AND queue like the old system did, do a call like |
26 |
|
|
Estimator.strategy.queue = 'atlas' |
27 |
|
|
before actually using the Estimator.estimate() method. |
28 |
|
|
""" |
29 |
|
|
def __init__(self): |
30 |
|
|
FindEstimate.__init__(self) |
31 |
|
|
self.queue = '' |
32 |
|
|
def algorithm(self, lrms, vo, debug): |
33 |
|
|
return ett(lrms, self.queue, vo, algorithm='average',debug=debug) |
34 |
|
|
|
35 |
|
|
class WaitTime(FindEstimate): |
36 |
|
|
"""This is Gott modified to use longest-wait times instead |
37 |
|
|
of average wait times. If you want to |
38 |
|
|
use the old scheme of looking at jobs matching both the VO |
39 |
|
|
AND queue like the old system did, do a call like |
40 |
|
|
Estimator.strategy.queue = 'atlas' |
41 |
|
|
before actually using the Estimator.estimate() method. |
42 |
|
|
""" |
43 |
|
|
def __init__(self): |
44 |
|
|
FindEstimate.__init__(self) |
45 |
|
|
self.queue = '' |
46 |
|
|
def algorithm(self, lrms, vo, debug): |
47 |
|
|
return ett(lrms, self.queue, vo, algorithm='longest',debug=debug) |
48 |
|
|
|
49 |
|
|
class Throughput(FindEstimate): |
50 |
|
|
"""This class is based on looking at job throughputs and calculating |
51 |
|
|
how long it will take to empty all waiting jobs for the desired VO |
52 |
|
|
based on the observe througput. Motivation is that Gott & WaitTime |
53 |
|
|
do very poorly when the job population is not stable, ie when the |
54 |
|
|
rates of job arrival and job starts are not roughly the same. |
55 |
|
|
""" |
56 |
|
|
def __init__(self): |
57 |
|
|
FindEstimate.__init__(self) |
58 |
|
|
self.twin = 10 |
59 |
|
|
""" |
60 |
|
|
Tunable parameter specifying how far back (how many jobs) in the system |
61 |
|
|
start history to look when calculating job throughput. IE if twin |
62 |
|
|
(Througput WINdow) is 4, code will look at no more than the last four |
63 |
|
|
jobs started to see how many jobs per second are being started. |
64 |
|
|
""" |
65 |
|
|
def algorithm(self, lrms, vo, debug): |
66 |
|
|
|
67 |
|
|
# get rid of the silliest case first |
68 |
|
|
|
69 |
|
|
if lrms.slotsUp < 1: # system has no job slots at all |
70 |
|
|
return 7777777 |
71 |
|
|
|
72 |
|
|
# gather a couple lists to help us decide |
73 |
|
|
|
74 |
|
|
def allrunfunc(job): |
75 |
|
|
return job.get('state') == 'running' |
76 |
|
|
def vowaitfunc(job): |
77 |
|
|
return job.get('state') == 'queued' and job.get('group') == vo |
78 |
|
|
allRunningJobs = lrms.matchingJobs(allrunfunc) |
79 |
|
|
waitingJobsMyVO = lrms.matchingJobs(vowaitfunc) |
80 |
|
|
|
81 |
|
|
# another very silly case; this VO has waiting jobs, but absolutely no |
82 |
|
|
# jobs are running. This is either a short transient that will only last |
83 |
|
|
# for one sched cycle, or else something is wrong. Assume the latter. |
84 |
|
|
|
85 |
|
|
if len(allRunningJobs) < 1 and len(waitingJobsMyVO) > 0 : |
86 |
|
|
return 7777777 |
87 |
|
|
|
88 |
|
|
# another silly case: someone just restarted the server. |
89 |
|
|
# symptom (at least on PBS): no free slots, lots of running |
90 |
|
|
# jobs, many (or all) of which have no 'start' record. |
91 |
|
|
|
92 |
|
|
# note this is not the only symptom ... there may be free |
93 |
|
|
# slots but still no jobs with historical info. Some code |
94 |
|
|
# later on to handle this in most_recent_run ... |
95 |
|
|
|
96 |
|
|
if lrms.slotsFree == 0: |
97 |
|
|
nr = len(allRunningJobs) |
98 |
|
|
nns = 0 # Number with No Start |
99 |
|
|
for j in allRunningJobs: |
100 |
|
|
if j.get('start') == None: nns = nns + 1 |
101 |
|
|
if float(nns)/nr >= 0.5: |
102 |
|
|
return 7777777 |
103 |
|
|
|
104 |
|
|
nw = len(waitingJobsMyVO) |
105 |
|
|
if nw < 0: |
106 |
|
|
print "found negative number of queued jobs!" ; sys.exit(1) |
107 |
|
|
elif nw == 0: |
108 |
|
|
if lrms.slotsFree > 0: |
109 |
|
|
return lrms.schedCycle / 2 |
110 |
|
|
else: |
111 |
|
|
if len(allRunningJobs) == 0: |
112 |
|
|
return lrms.schedCycle / 2 |
113 |
|
|
else: |
114 |
|
|
most_recent = allRunningJobs[0] |
115 |
|
|
startList = [ ] |
116 |
|
|
for j in allRunningJobs: |
117 |
|
|
tval = j.get('start') |
118 |
|
|
if tval: # jobs started during last poll cycle won't have 'start' |
119 |
|
|
startList.append(tval) |
120 |
|
|
else: |
121 |
|
|
startList.append(lrms.schedCycle / 2) # rough approx |
122 |
|
|
startList.sort() # last element will be most recent start |
123 |
|
|
time_sys_full = lrms.now - startList[-1] |
124 |
|
|
return time_sys_full + lrms.schedCycle/2 |
125 |
|
|
else: # there are waiting jobs |
126 |
|
|
|
127 |
|
|
# first check one special case: the one where *all* waiting jobs |
128 |
|
|
# have been in the system for less than the schedule cycle |
129 |
|
|
# if there are enough slots to run them all, skip the throughput |
130 |
|
|
# calculation. |
131 |
|
|
|
132 |
|
|
waitList = [ ] |
133 |
|
|
for j in waitingJobsMyVO: |
134 |
|
|
waitList.append(lrms.now - j.get('qtime')) |
135 |
|
|
waitList.sort() |
136 |
|
|
if waitList[-1] < 1.1*lrms.schedCycle \ |
137 |
|
|
and len(waitList) < lrms.slotsFree: |
138 |
|
|
return lrms.schedCycle / 2 |
139 |
|
|
|
140 |
|
|
# calcs based on throughput. we know since we checked above that at |
141 |
|
|
# least *some* jobs are running. Use jobs from our VO if we have |
142 |
|
|
# them, otherwise use list of all running jobs. |
143 |
|
|
|
144 |
|
|
def vorunfunc(job): |
145 |
|
|
return job.get('state') == 'running' and job.get('group') == vo |
146 |
|
|
runningJobsMyVO = lrms.matchingJobs(vorunfunc) |
147 |
|
|
if len(runningJobsMyVO) > 0: |
148 |
|
|
runlist = runningJobsMyVO |
149 |
|
|
else: |
150 |
|
|
runlist = allRunningJobs |
151 |
|
|
startList = [ ] |
152 |
|
|
for j in runlist: |
153 |
|
|
st = j.get('start') |
154 |
|
|
if st : |
155 |
|
|
startList.append(st) |
156 |
|
|
else: |
157 |
|
|
startList.append(lrms.now - lrms.schedCycle/2) |
158 |
|
|
startList.sort() |
159 |
|
|
|
160 |
|
|
# decide how many jobs to use to calc throughput; take minimum |
161 |
|
|
# of number running, number waiting, and the 'throughput window' |
162 |
|
|
# that can be set on the algorithm. Note we have to use the |
163 |
|
|
# startList instead of runlist since there may be starts for |
164 |
|
|
# which we have no records yet (PBS caching). |
165 |
|
|
|
166 |
|
|
num = min(nw,len(startList),self.twin) |
167 |
|
|
period = lrms.now - startList[-num] |
168 |
|
|
thruput = float(num)/period # jobs per sec |
169 |
|
|
|
170 |
|
|
if len(runningJobsMyVO) > 0: |
171 |
|
|
return int((nw+1) / thruput) |
172 |
|
|
else: |
173 |
|
|
# jobs are running but not yours, so assume you need to finish all |
174 |
|
|
# waiting jobs before your new one starts |
175 |
|
|
naw = len(lrms.jobs()) - len(runlist) # counts all waiting jobs |
176 |
|
|
return int((naw+1) / thruput) |
177 |
|
|
|
178 |
|
|
# the "context" controls the strategy: |
179 |
|
|
class TTEstimator(object): |
180 |
|
|
def __init__(self, strategy): |
181 |
|
|
self.strategy = strategy |
182 |
|
|
def estimate(self, lrms, vo, debug): |
183 |
|
|
return self.strategy.algorithm(lrms, vo, debug) |
184 |
|
|
|
185 |
|
|
# there are several algorithms for what to do with queued jobs. |
186 |
|
|
# define them here and provide means to select between them. |
187 |
|
|
|
188 |
|
|
def ett_longest_queue_time(server,list) : |
189 |
|
|
est = 7777777 |
190 |
|
|
qtlist = [ ] |
191 |
|
|
|
192 |
|
|
for j in list: |
193 |
|
|
if j.get('state') == 'queued' : |
194 |
|
|
qtlist.append(server.now - j.get('qtime')) |
195 |
|
|
qtlist.sort() |
196 |
|
|
est = qtlist[-1] # sorted list so this is the longest waiting. |
197 |
|
|
|
198 |
|
|
return est |
199 |
|
|
|
200 |
|
|
def ett_avg_queue_time(server,list) : |
201 |
|
|
est = 7777777 |
202 |
|
|
# find average time in queue of all queued jobs |
203 |
|
|
count = 0 |
204 |
|
|
sum = 0.0 |
205 |
|
|
for j in list: |
206 |
|
|
if j.get('state') == 'queued' : |
207 |
|
|
count = count + 1 |
208 |
|
|
sum = sum + (server.now - j.get('qtime')) |
209 |
|
|
avgtime = sum / count |
210 |
|
|
# avg time in queue likely to be half of expected time in queue |
211 |
|
|
# => return twice avg as estimate. |
212 |
|
|
est = int(2*avgtime) |
213 |
|
|
|
214 |
|
|
return est |
215 |
|
|
|
216 |
|
|
# make dict of functions to allow run-time selection |
217 |
|
|
_ALGS = { |
218 |
|
|
'average' : ett_avg_queue_time, |
219 |
|
|
'longest' : ett_longest_queue_time } |
220 |
|
|
|
221 |
|
|
# adjusted_ett provides a central place to make any changes to |
222 |
|
|
# return values; right now it just puts a lower limit of 5 on |
223 |
|
|
# the lower limit. Could be used e.g. to change units. |
224 |
|
|
def adjusted_ett(rawval,period): |
225 |
|
|
if rawval < period: |
226 |
|
|
return int(period / 2.) |
227 |
|
|
else: |
228 |
|
|
return int(rawval) |
229 |
|
|
|
230 |
|
|
def ett(server,queue,voname='',algorithm='longest',debug=0) : |
231 |
|
|
|
232 |
|
|
if debug: print 'running for VO', voname |
233 |
|
|
|
234 |
|
|
jfilt = {'state' : 'queued'} |
235 |
|
|
|
236 |
|
|
if queue != '': jfilt['queue'] = queue |
237 |
|
|
if voname not in ['', None]: jfilt['group'] = voname |
238 |
|
|
|
239 |
|
|
jobsrunning = {'state': 'running'} |
240 |
|
|
|
241 |
|
|
nq = server.nmatch(jfilt) |
242 |
|
|
if debug: print 'found', nq, 'queued jobs in queue \'' + queue + \ |
243 |
|
|
'\' for VO', voname |
244 |
|
|
if nq > 0: # there are queued jobs for this VO |
245 |
|
|
est = _ALGS[algorithm](server,server.jobs_last_query()) |
246 |
|
|
elif server.slotsFree > 0 : |
247 |
|
|
if debug: print 'found', server.slotsFree, 'free job slots' |
248 |
|
|
est = 0 |
249 |
|
|
else : |
250 |
|
|
server.nmatch(jobsrunning) # get all running jobs |
251 |
|
|
est = ett_most_recent_run(server.now, |
252 |
|
|
server.jobs_last_query(), |
253 |
|
|
debug) |
254 |
|
|
|
255 |
|
|
return adjusted_ett(est,server.schedCycle) |
256 |
|
|
|
257 |
|
|
def ett_most_recent_run(now,joblist,debug=0) : |
258 |
|
|
|
259 |
|
|
# find most recently run job |
260 |
|
|
# make list of start times |
261 |
|
|
|
262 |
|
|
starts = list() |
263 |
|
|
for j in joblist: |
264 |
|
|
st = j.get('start') |
265 |
|
|
if st : |
266 |
|
|
starts.append(st) |
267 |
|
|
|
268 |
|
|
if len(starts) > 0: |
269 |
|
|
starts.sort() |
270 |
|
|
last_started = starts[-1] # last in list - start with largest timestamp |
271 |
|
|
if debug : print now, last_started |
272 |
|
|
timefilled = now - last_started |
273 |
|
|
if debug : print 'most recent run', timefilled |
274 |
|
|
return timefilled |
275 |
|
|
else: |
276 |
|
|
return 777777 # error code for 'i don't know' |