1 |
#-- |
2 |
# EstTT.py -- class for ESTimating Traversal Times via various |
3 |
# algorithms. Use "strategy" design pattern to handle alg |
4 |
# framework. |
5 |
# J. A. Templon, NIKHEF, PDP Group |
6 |
# $Id$ |
7 |
# Source: $URL$ |
8 |
|
9 |
#-- |
10 |
|
11 |
# the strategy interface (ie generic algorithm interface): |
12 |
class FindEstimate(object): |
13 |
def algorithm(self, lrms, vo, debug): |
14 |
""" |
15 |
lrms == concrete instance of class lrms (snapshot of LRMS state) |
16 |
vo == virtual organization for which you want estimate |
17 |
note: so far this usually maps to unix group of mapped user |
18 |
debug: 0 is don't debug, 1 is to print debugging info |
19 |
""" |
20 |
pass |
21 |
|
22 |
# put concrete algorithm instances here |
23 |
|
24 |
class Gott(FindEstimate): |
25 |
"""The original Gott ideas are embodied here. If you want to |
26 |
use the old scheme of looking at jobs matching both the VO |
27 |
AND queue like the old system did, do a call like |
28 |
Estimator.strategy.queue = 'atlas' |
29 |
before actually using the Estimator.estimate() method. |
30 |
""" |
31 |
def __init__(self): |
32 |
FindEstimate.__init__(self) |
33 |
self.queue = '' |
34 |
def algorithm(self, lrms, vo, debug): |
35 |
return ett(lrms, self.queue, vo, algorithm='average',debug=debug) |
36 |
|
37 |
class WaitTime(FindEstimate): |
38 |
"""This is Gott modified to use longest-wait times instead |
39 |
of average wait times. If you want to |
40 |
use the old scheme of looking at jobs matching both the VO |
41 |
AND queue like the old system did, do a call like |
42 |
Estimator.strategy.queue = 'atlas' |
43 |
before actually using the Estimator.estimate() method. |
44 |
""" |
45 |
def __init__(self): |
46 |
FindEstimate.__init__(self) |
47 |
self.queue = '' |
48 |
def algorithm(self, lrms, vo, debug): |
49 |
return ett(lrms, self.queue, vo, algorithm='longest',debug=debug) |
50 |
|
51 |
class Throughput(FindEstimate): |
52 |
"""This class is based on looking at job throughputs and calculating |
53 |
how long it will take to empty all waiting jobs for the desired VO |
54 |
based on the observe througput. Motivation is that Gott & WaitTime |
55 |
do very poorly when the job population is not stable, ie when the |
56 |
rates of job arrival and job starts are not roughly the same. |
57 |
""" |
58 |
def __init__(self): |
59 |
FindEstimate.__init__(self) |
60 |
self.twin = 10 |
61 |
""" |
62 |
Tunable parameter specifying how far back (how many jobs) in the system |
63 |
start history to look when calculating job throughput. IE if twin |
64 |
(Througput WINdow) is 4, code will look at no more than the last four |
65 |
jobs started to see how many jobs per second are being started. |
66 |
""" |
67 |
def algorithm(self, lrms, vo, debug): |
68 |
|
69 |
# get rid of the silliest case first |
70 |
|
71 |
if lrms.slotsUp < 1: # system has no job slots at all |
72 |
return 7777777 |
73 |
|
74 |
# gather a couple lists to help us decide |
75 |
|
76 |
def allrunfunc(job): |
77 |
return job.get('state') == 'running' |
78 |
def vowaitfunc(job): |
79 |
return job.get('state') == 'queued' and job.get('group') == vo |
80 |
allRunningJobs = lrms.matchingJobs(allrunfunc) |
81 |
waitingJobsMyVO = lrms.matchingJobs(vowaitfunc) |
82 |
|
83 |
# another very silly case; this VO has waiting jobs, but absolutely no |
84 |
# jobs are running. This is either a short transient that will only last |
85 |
# for one sched cycle, or else something is wrong. Assume the latter. |
86 |
|
87 |
if len(allRunningJobs) < 1 and len(waitingJobsMyVO) > 0 : |
88 |
return 7777777 |
89 |
|
90 |
# another silly case: someone just restarted the server. |
91 |
# symptom (at least on PBS): no free slots, lots of running |
92 |
# jobs, many (or all) of which have no 'start' record. |
93 |
|
94 |
# note this is not the only symptom ... there may be free |
95 |
# slots but still no jobs with historical info. Some code |
96 |
# later on to handle this in most_recent_run ... |
97 |
|
98 |
if lrms.slotsFree == 0: |
99 |
nr = len(allRunningJobs) |
100 |
nns = 0 # Number with No Start |
101 |
for j in allRunningJobs: |
102 |
if j.get('start') == None: nns = nns + 1 |
103 |
if float(nns)/nr >= 0.5: |
104 |
return 7777777 |
105 |
|
106 |
nw = len(waitingJobsMyVO) |
107 |
if nw < 0: |
108 |
print "found negative number of queued jobs!" ; sys.exit(1) |
109 |
elif nw == 0: |
110 |
if lrms.slotsFree > 0: |
111 |
return lrms.schedCycle / 2 |
112 |
else: |
113 |
if len(allRunningJobs) == 0: |
114 |
return lrms.schedCycle / 2 |
115 |
else: |
116 |
most_recent = allRunningJobs[0] |
117 |
startList = [ ] |
118 |
for j in allRunningJobs: |
119 |
tval = j.get('start') |
120 |
if tval: # jobs started during last poll cycle won't have 'start' |
121 |
startList.append(tval) |
122 |
else: |
123 |
startList.append(lrms.schedCycle / 2) # rough approx |
124 |
startList.sort() # last element will be most recent start |
125 |
time_sys_full = lrms.now - startList[-1] |
126 |
return time_sys_full + lrms.schedCycle/2 |
127 |
else: # there are waiting jobs |
128 |
|
129 |
# first check one special case: the one where *all* waiting jobs |
130 |
# have been in the system for less than the schedule cycle |
131 |
# if there are enough slots to run them all, skip the throughput |
132 |
# calculation. |
133 |
|
134 |
waitList = [ ] |
135 |
for j in waitingJobsMyVO: |
136 |
waitList.append(lrms.now - j.get('qtime')) |
137 |
waitList.sort() |
138 |
if waitList[-1] < 1.1*lrms.schedCycle \ |
139 |
and len(waitList) < lrms.slotsFree: |
140 |
return lrms.schedCycle / 2 |
141 |
|
142 |
# calcs based on throughput. we know since we checked above that at |
143 |
# least *some* jobs are running. Use jobs from our VO if we have |
144 |
# them, otherwise use list of all running jobs. |
145 |
|
146 |
def vorunfunc(job): |
147 |
return job.get('state') == 'running' and job.get('group') == vo |
148 |
runningJobsMyVO = lrms.matchingJobs(vorunfunc) |
149 |
if len(runningJobsMyVO) > 0: |
150 |
runlist = runningJobsMyVO |
151 |
else: |
152 |
runlist = allRunningJobs |
153 |
startList = [ ] |
154 |
for j in runlist: |
155 |
st = j.get('start') |
156 |
if st : |
157 |
startList.append(st) |
158 |
else: |
159 |
startList.append(lrms.now - lrms.schedCycle/2) |
160 |
startList.sort() |
161 |
|
162 |
# decide how many jobs to use to calc throughput; take minimum |
163 |
# of number running, number waiting, and the 'throughput window' |
164 |
# that can be set on the algorithm. Note we have to use the |
165 |
# startList instead of runlist since there may be starts for |
166 |
# which we have no records yet (PBS caching). |
167 |
|
168 |
num = min(nw,len(startList),self.twin) |
169 |
period = lrms.now - startList[-num] |
170 |
thruput = float(num)/period # jobs per sec |
171 |
|
172 |
if len(runningJobsMyVO) > 0: |
173 |
return int((nw+1) / thruput) |
174 |
else: |
175 |
# jobs are running but not yours, so assume you need to finish all |
176 |
# waiting jobs before your new one starts |
177 |
naw = len(lrms.jobs()) - len(runlist) # counts all waiting jobs |
178 |
return int((naw+1) / thruput) |
179 |
|
180 |
# the "context" controls the strategy: |
181 |
class TTEstimator(object): |
182 |
def __init__(self, strategy): |
183 |
self.strategy = strategy |
184 |
def estimate(self, lrms, vo, debug): |
185 |
return self.strategy.algorithm(lrms, vo, debug) |
186 |
|
187 |
# there are several algorithms for what to do with queued jobs. |
188 |
# define them here and provide means to select between them. |
189 |
|
190 |
def ett_longest_queue_time(server,list) : |
191 |
est = 7777777 |
192 |
qtlist = [ ] |
193 |
|
194 |
for j in list: |
195 |
if j.get('state') == 'queued' : |
196 |
qtlist.append(server.now - j.get('qtime')) |
197 |
qtlist.sort() |
198 |
est = qtlist[-1] # sorted list so this is the longest waiting. |
199 |
|
200 |
return est |
201 |
|
202 |
def ett_avg_queue_time(server,list) : |
203 |
est = 7777777 |
204 |
# find average time in queue of all queued jobs |
205 |
count = 0 |
206 |
sum = 0.0 |
207 |
for j in list: |
208 |
if j.get('state') == 'queued' : |
209 |
count = count + 1 |
210 |
sum = sum + (server.now - j.get('qtime')) |
211 |
avgtime = sum / count |
212 |
# avg time in queue likely to be half of expected time in queue |
213 |
# => return twice avg as estimate. |
214 |
est = int(2*avgtime) |
215 |
|
216 |
return est |
217 |
|
218 |
# make dict of functions to allow run-time selection |
219 |
_ALGS = { |
220 |
'average' : ett_avg_queue_time, |
221 |
'longest' : ett_longest_queue_time } |
222 |
|
223 |
# adjusted_ett provides a central place to make any changes to |
224 |
# return values; right now it just puts a lower limit of 5 on |
225 |
# the lower limit. Could be used e.g. to change units. |
226 |
def adjusted_ett(rawval,period): |
227 |
if rawval < period: |
228 |
return int(period / 2.) |
229 |
else: |
230 |
return int(rawval) |
231 |
|
232 |
def ett(server,queue,voname='',algorithm='longest',debug=0) : |
233 |
|
234 |
if debug: print 'running for VO', voname |
235 |
|
236 |
jfilt = {'state' : 'queued'} |
237 |
|
238 |
if queue != '': jfilt['queue'] = queue |
239 |
if voname not in ['', None]: jfilt['group'] = voname |
240 |
|
241 |
jobsrunning = {'state': 'running'} |
242 |
|
243 |
nq = server.nmatch(jfilt) |
244 |
if debug: print 'found', nq, 'queued jobs in queue \'' + queue + \ |
245 |
'\' for VO', voname |
246 |
if nq > 0: # there are queued jobs for this VO |
247 |
est = _ALGS[algorithm](server,server.jobs_last_query()) |
248 |
elif server.slotsFree > 0 : |
249 |
if debug: print 'found', server.slotsFree, 'free job slots' |
250 |
est = 0 |
251 |
else : |
252 |
server.nmatch(jobsrunning) # get all running jobs |
253 |
est = ett_most_recent_run(server.now, |
254 |
server.jobs_last_query(), |
255 |
debug) |
256 |
|
257 |
return adjusted_ett(est,server.schedCycle) |
258 |
|
259 |
def ett_most_recent_run(now,joblist,debug=0) : |
260 |
|
261 |
# find most recently run job |
262 |
# make list of start times |
263 |
|
264 |
starts = list() |
265 |
for j in joblist: |
266 |
st = j.get('start') |
267 |
if st : |
268 |
starts.append(st) |
269 |
|
270 |
if len(starts) > 0: |
271 |
starts.sort() |
272 |
last_started = starts[-1] # last in list - start with largest timestamp |
273 |
if debug : print now, last_started |
274 |
timefilled = now - last_started |
275 |
if debug : print 'most recent run', timefilled |
276 |
return timefilled |
277 |
else: |
278 |
return 777777 # error code for 'i don't know' |