1 |
templon |
1999 |
#-- |
2 |
|
|
# EstTT.py -- class for ESTimating Traversal Times via various |
3 |
|
|
# algorithms. Use "strategy" design pattern to handle alg |
4 |
|
|
# framework. |
5 |
|
|
# J. A. Templon, NIKHEF, PDP Group |
6 |
templon |
2007 |
# $Id$ |
7 |
|
|
# Source: $URL$ |
8 |
|
|
|
9 |
templon |
1999 |
#-- |
10 |
|
|
|
11 |
|
|
# the strategy interface (ie generic algorithm interface): |
12 |
|
|
class FindEstimate(object): |
13 |
|
|
def algorithm(self, lrms, vo, debug): |
14 |
|
|
""" |
15 |
|
|
lrms == concrete instance of class lrms (snapshot of LRMS state) |
16 |
|
|
vo == virtual organization for which you want estimate |
17 |
|
|
note: so far this usually maps to unix group of mapped user |
18 |
|
|
debug: 0 is don't debug, 1 is to print debugging info |
19 |
|
|
""" |
20 |
|
|
pass |
21 |
|
|
|
22 |
|
|
# put concrete algorithm instances here |
23 |
|
|
|
24 |
|
|
class Gott(FindEstimate): |
25 |
|
|
"""The original Gott ideas are embodied here. If you want to |
26 |
|
|
use the old scheme of looking at jobs matching both the VO |
27 |
|
|
AND queue like the old system did, do a call like |
28 |
|
|
Estimator.strategy.queue = 'atlas' |
29 |
|
|
before actually using the Estimator.estimate() method. |
30 |
|
|
""" |
31 |
|
|
def __init__(self): |
32 |
|
|
FindEstimate.__init__(self) |
33 |
|
|
self.queue = '' |
34 |
|
|
def algorithm(self, lrms, vo, debug): |
35 |
|
|
return ett(lrms, self.queue, vo, algorithm='average',debug=debug) |
36 |
|
|
|
37 |
|
|
class WaitTime(FindEstimate): |
38 |
|
|
"""This is Gott modified to use longest-wait times instead |
39 |
|
|
of average wait times. If you want to |
40 |
|
|
use the old scheme of looking at jobs matching both the VO |
41 |
|
|
AND queue like the old system did, do a call like |
42 |
|
|
Estimator.strategy.queue = 'atlas' |
43 |
|
|
before actually using the Estimator.estimate() method. |
44 |
|
|
""" |
45 |
|
|
def __init__(self): |
46 |
|
|
FindEstimate.__init__(self) |
47 |
|
|
self.queue = '' |
48 |
|
|
def algorithm(self, lrms, vo, debug): |
49 |
|
|
return ett(lrms, self.queue, vo, algorithm='longest',debug=debug) |
50 |
|
|
|
51 |
|
|
class Throughput(FindEstimate): |
52 |
|
|
"""This class is based on looking at job throughputs and calculating |
53 |
|
|
how long it will take to empty all waiting jobs for the desired VO |
54 |
|
|
based on the observe througput. Motivation is that Gott & WaitTime |
55 |
|
|
do very poorly when the job population is not stable, ie when the |
56 |
|
|
rates of job arrival and job starts are not roughly the same. |
57 |
|
|
""" |
58 |
|
|
def __init__(self): |
59 |
|
|
FindEstimate.__init__(self) |
60 |
|
|
self.twin = 10 |
61 |
|
|
""" |
62 |
|
|
Tunable parameter specifying how far back (how many jobs) in the system |
63 |
|
|
start history to look when calculating job throughput. IE if twin |
64 |
|
|
(Througput WINdow) is 4, code will look at no more than the last four |
65 |
|
|
jobs started to see how many jobs per second are being started. |
66 |
|
|
""" |
67 |
|
|
def algorithm(self, lrms, vo, debug): |
68 |
|
|
|
69 |
|
|
# get rid of the silliest case first |
70 |
|
|
|
71 |
|
|
if lrms.slotsUp < 1: # system has no job slots at all |
72 |
|
|
return 7777777 |
73 |
|
|
|
74 |
|
|
# gather a couple lists to help us decide |
75 |
|
|
|
76 |
|
|
def allrunfunc(job): |
77 |
|
|
return job.get('state') == 'running' |
78 |
|
|
def vowaitfunc(job): |
79 |
|
|
return job.get('state') == 'queued' and job.get('group') == vo |
80 |
|
|
allRunningJobs = lrms.matchingJobs(allrunfunc) |
81 |
|
|
waitingJobsMyVO = lrms.matchingJobs(vowaitfunc) |
82 |
|
|
|
83 |
|
|
# another very silly case; this VO has waiting jobs, but absolutely no |
84 |
|
|
# jobs are running. This is either a short transient that will only last |
85 |
|
|
# for one sched cycle, or else something is wrong. Assume the latter. |
86 |
|
|
|
87 |
|
|
if len(allRunningJobs) < 1 and len(waitingJobsMyVO) > 0 : |
88 |
|
|
return 7777777 |
89 |
|
|
|
90 |
|
|
# another silly case: someone just restarted the server. |
91 |
|
|
# symptom (at least on PBS): no free slots, lots of running |
92 |
|
|
# jobs, many (or all) of which have no 'start' record. |
93 |
|
|
|
94 |
|
|
# note this is not the only symptom ... there may be free |
95 |
|
|
# slots but still no jobs with historical info. Some code |
96 |
|
|
# later on to handle this in most_recent_run ... |
97 |
|
|
|
98 |
|
|
if lrms.slotsFree == 0: |
99 |
|
|
nr = len(allRunningJobs) |
100 |
|
|
nns = 0 # Number with No Start |
101 |
|
|
for j in allRunningJobs: |
102 |
|
|
if j.get('start') == None: nns = nns + 1 |
103 |
|
|
if float(nns)/nr >= 0.5: |
104 |
|
|
return 7777777 |
105 |
|
|
|
106 |
|
|
nw = len(waitingJobsMyVO) |
107 |
|
|
if nw < 0: |
108 |
|
|
print "found negative number of queued jobs!" ; sys.exit(1) |
109 |
|
|
elif nw == 0: |
110 |
|
|
if lrms.slotsFree > 0: |
111 |
|
|
return lrms.schedCycle / 2 |
112 |
|
|
else: |
113 |
|
|
if len(allRunningJobs) == 0: |
114 |
|
|
return lrms.schedCycle / 2 |
115 |
|
|
else: |
116 |
|
|
most_recent = allRunningJobs[0] |
117 |
|
|
startList = [ ] |
118 |
|
|
for j in allRunningJobs: |
119 |
|
|
tval = j.get('start') |
120 |
|
|
if tval: # jobs started during last poll cycle won't have 'start' |
121 |
|
|
startList.append(tval) |
122 |
|
|
else: |
123 |
|
|
startList.append(lrms.schedCycle / 2) # rough approx |
124 |
|
|
startList.sort() # last element will be most recent start |
125 |
|
|
time_sys_full = lrms.now - startList[-1] |
126 |
|
|
return time_sys_full + lrms.schedCycle/2 |
127 |
|
|
else: # there are waiting jobs |
128 |
|
|
|
129 |
|
|
# first check one special case: the one where *all* waiting jobs |
130 |
|
|
# have been in the system for less than the schedule cycle |
131 |
|
|
# if there are enough slots to run them all, skip the throughput |
132 |
|
|
# calculation. |
133 |
|
|
|
134 |
|
|
waitList = [ ] |
135 |
|
|
for j in waitingJobsMyVO: |
136 |
|
|
waitList.append(lrms.now - j.get('qtime')) |
137 |
|
|
waitList.sort() |
138 |
|
|
if waitList[-1] < 1.1*lrms.schedCycle \ |
139 |
|
|
and len(waitList) < lrms.slotsFree: |
140 |
|
|
return lrms.schedCycle / 2 |
141 |
|
|
|
142 |
|
|
# calcs based on throughput. we know since we checked above that at |
143 |
|
|
# least *some* jobs are running. Use jobs from our VO if we have |
144 |
|
|
# them, otherwise use list of all running jobs. |
145 |
|
|
|
146 |
|
|
def vorunfunc(job): |
147 |
|
|
return job.get('state') == 'running' and job.get('group') == vo |
148 |
|
|
runningJobsMyVO = lrms.matchingJobs(vorunfunc) |
149 |
|
|
if len(runningJobsMyVO) > 0: |
150 |
|
|
runlist = runningJobsMyVO |
151 |
|
|
else: |
152 |
|
|
runlist = allRunningJobs |
153 |
|
|
startList = [ ] |
154 |
|
|
for j in runlist: |
155 |
|
|
st = j.get('start') |
156 |
|
|
if st : |
157 |
|
|
startList.append(st) |
158 |
|
|
else: |
159 |
|
|
startList.append(lrms.now - lrms.schedCycle/2) |
160 |
|
|
startList.sort() |
161 |
|
|
|
162 |
|
|
# decide how many jobs to use to calc throughput; take minimum |
163 |
|
|
# of number running, number waiting, and the 'throughput window' |
164 |
|
|
# that can be set on the algorithm. Note we have to use the |
165 |
|
|
# startList instead of runlist since there may be starts for |
166 |
|
|
# which we have no records yet (PBS caching). |
167 |
|
|
|
168 |
|
|
num = min(nw,len(startList),self.twin) |
169 |
|
|
period = lrms.now - startList[-num] |
170 |
|
|
thruput = float(num)/period # jobs per sec |
171 |
|
|
|
172 |
|
|
if len(runningJobsMyVO) > 0: |
173 |
|
|
return int((nw+1) / thruput) |
174 |
|
|
else: |
175 |
|
|
# jobs are running but not yours, so assume you need to finish all |
176 |
|
|
# waiting jobs before your new one starts |
177 |
|
|
naw = len(lrms.jobs()) - len(runlist) # counts all waiting jobs |
178 |
|
|
return int((naw+1) / thruput) |
179 |
|
|
|
180 |
|
|
# the "context" controls the strategy: |
181 |
|
|
class TTEstimator(object): |
182 |
|
|
def __init__(self, strategy): |
183 |
|
|
self.strategy = strategy |
184 |
|
|
def estimate(self, lrms, vo, debug): |
185 |
|
|
return self.strategy.algorithm(lrms, vo, debug) |
186 |
|
|
|
187 |
|
|
# there are several algorithms for what to do with queued jobs. |
188 |
|
|
# define them here and provide means to select between them. |
189 |
|
|
|
190 |
|
|
def ett_longest_queue_time(server,list) : |
191 |
|
|
est = 7777777 |
192 |
|
|
qtlist = [ ] |
193 |
|
|
|
194 |
|
|
for j in list: |
195 |
|
|
if j.get('state') == 'queued' : |
196 |
|
|
qtlist.append(server.now - j.get('qtime')) |
197 |
|
|
qtlist.sort() |
198 |
|
|
est = qtlist[-1] # sorted list so this is the longest waiting. |
199 |
|
|
|
200 |
|
|
return est |
201 |
|
|
|
202 |
|
|
def ett_avg_queue_time(server,list) : |
203 |
|
|
est = 7777777 |
204 |
|
|
# find average time in queue of all queued jobs |
205 |
|
|
count = 0 |
206 |
|
|
sum = 0.0 |
207 |
|
|
for j in list: |
208 |
|
|
if j.get('state') == 'queued' : |
209 |
|
|
count = count + 1 |
210 |
|
|
sum = sum + (server.now - j.get('qtime')) |
211 |
|
|
avgtime = sum / count |
212 |
|
|
# avg time in queue likely to be half of expected time in queue |
213 |
|
|
# => return twice avg as estimate. |
214 |
|
|
est = int(2*avgtime) |
215 |
|
|
|
216 |
|
|
return est |
217 |
|
|
|
218 |
|
|
# make dict of functions to allow run-time selection |
219 |
|
|
_ALGS = { |
220 |
|
|
'average' : ett_avg_queue_time, |
221 |
|
|
'longest' : ett_longest_queue_time } |
222 |
|
|
|
223 |
|
|
# adjusted_ett provides a central place to make any changes to |
224 |
|
|
# return values; right now it just puts a lower limit of 5 on |
225 |
|
|
# the lower limit. Could be used e.g. to change units. |
226 |
|
|
def adjusted_ett(rawval,period): |
227 |
|
|
if rawval < period: |
228 |
|
|
return int(period / 2.) |
229 |
|
|
else: |
230 |
|
|
return int(rawval) |
231 |
|
|
|
232 |
|
|
def ett(server,queue,voname='',algorithm='longest',debug=0) : |
233 |
|
|
|
234 |
|
|
if debug: print 'running for VO', voname |
235 |
|
|
|
236 |
|
|
jfilt = {'state' : 'queued'} |
237 |
|
|
|
238 |
|
|
if queue != '': jfilt['queue'] = queue |
239 |
|
|
if voname not in ['', None]: jfilt['group'] = voname |
240 |
|
|
|
241 |
|
|
jobsrunning = {'state': 'running'} |
242 |
|
|
|
243 |
|
|
nq = server.nmatch(jfilt) |
244 |
|
|
if debug: print 'found', nq, 'queued jobs in queue \'' + queue + \ |
245 |
|
|
'\' for VO', voname |
246 |
|
|
if nq > 0: # there are queued jobs for this VO |
247 |
|
|
est = _ALGS[algorithm](server,server.jobs_last_query()) |
248 |
|
|
elif server.slotsFree > 0 : |
249 |
|
|
if debug: print 'found', server.slotsFree, 'free job slots' |
250 |
|
|
est = 0 |
251 |
|
|
else : |
252 |
|
|
server.nmatch(jobsrunning) # get all running jobs |
253 |
|
|
est = ett_most_recent_run(server.now, |
254 |
|
|
server.jobs_last_query(), |
255 |
|
|
debug) |
256 |
|
|
|
257 |
|
|
return adjusted_ett(est,server.schedCycle) |
258 |
|
|
|
259 |
|
|
def ett_most_recent_run(now,joblist,debug=0) : |
260 |
|
|
|
261 |
|
|
# find most recently run job |
262 |
|
|
# make list of start times |
263 |
|
|
|
264 |
|
|
starts = list() |
265 |
|
|
for j in joblist: |
266 |
|
|
st = j.get('start') |
267 |
|
|
if st : |
268 |
|
|
starts.append(st) |
269 |
|
|
|
270 |
|
|
if len(starts) > 0: |
271 |
|
|
starts.sort() |
272 |
|
|
last_started = starts[-1] # last in list - start with largest timestamp |
273 |
|
|
if debug : print now, last_started |
274 |
|
|
timefilled = now - last_started |
275 |
|
|
if debug : print 'most recent run', timefilled |
276 |
|
|
return timefilled |
277 |
|
|
else: |
278 |
|
|
return 777777 # error code for 'i don't know' |