/[pdpsoft]/nl.nikhef.pdp.dynsched/trunk/EstTT.py
ViewVC logotype

Annotation of /nl.nikhef.pdp.dynsched/trunk/EstTT.py

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2007 - (hide annotations) (download) (as text)
Fri Oct 8 12:50:40 2010 UTC (11 years, 7 months ago) by templon
File MIME type: application/x-python
File size: 10541 byte(s)
added header lines

1 templon 1999 #--
2     # EstTT.py -- class for ESTimating Traversal Times via various
3     # algorithms. Use "strategy" design pattern to handle alg
4     # framework.
5     # J. A. Templon, NIKHEF, PDP Group
6 templon 2007 # $Id$
7     # Source: $URL$
8    
9 templon 1999 #--
10    
11     # the strategy interface (ie generic algorithm interface):
12     class FindEstimate(object):
13     def algorithm(self, lrms, vo, debug):
14     """
15     lrms == concrete instance of class lrms (snapshot of LRMS state)
16     vo == virtual organization for which you want estimate
17     note: so far this usually maps to unix group of mapped user
18     debug: 0 is don't debug, 1 is to print debugging info
19     """
20     pass
21    
22     # put concrete algorithm instances here
23    
24     class Gott(FindEstimate):
25     """The original Gott ideas are embodied here. If you want to
26     use the old scheme of looking at jobs matching both the VO
27     AND queue like the old system did, do a call like
28     Estimator.strategy.queue = 'atlas'
29     before actually using the Estimator.estimate() method.
30     """
31     def __init__(self):
32     FindEstimate.__init__(self)
33     self.queue = ''
34     def algorithm(self, lrms, vo, debug):
35     return ett(lrms, self.queue, vo, algorithm='average',debug=debug)
36    
37     class WaitTime(FindEstimate):
38     """This is Gott modified to use longest-wait times instead
39     of average wait times. If you want to
40     use the old scheme of looking at jobs matching both the VO
41     AND queue like the old system did, do a call like
42     Estimator.strategy.queue = 'atlas'
43     before actually using the Estimator.estimate() method.
44     """
45     def __init__(self):
46     FindEstimate.__init__(self)
47     self.queue = ''
48     def algorithm(self, lrms, vo, debug):
49     return ett(lrms, self.queue, vo, algorithm='longest',debug=debug)
50    
51     class Throughput(FindEstimate):
52     """This class is based on looking at job throughputs and calculating
53     how long it will take to empty all waiting jobs for the desired VO
54     based on the observe througput. Motivation is that Gott & WaitTime
55     do very poorly when the job population is not stable, ie when the
56     rates of job arrival and job starts are not roughly the same.
57     """
58     def __init__(self):
59     FindEstimate.__init__(self)
60     self.twin = 10
61     """
62     Tunable parameter specifying how far back (how many jobs) in the system
63     start history to look when calculating job throughput. IE if twin
64     (Througput WINdow) is 4, code will look at no more than the last four
65     jobs started to see how many jobs per second are being started.
66     """
67     def algorithm(self, lrms, vo, debug):
68    
69     # get rid of the silliest case first
70    
71     if lrms.slotsUp < 1: # system has no job slots at all
72     return 7777777
73    
74     # gather a couple lists to help us decide
75    
76     def allrunfunc(job):
77     return job.get('state') == 'running'
78     def vowaitfunc(job):
79     return job.get('state') == 'queued' and job.get('group') == vo
80     allRunningJobs = lrms.matchingJobs(allrunfunc)
81     waitingJobsMyVO = lrms.matchingJobs(vowaitfunc)
82    
83     # another very silly case; this VO has waiting jobs, but absolutely no
84     # jobs are running. This is either a short transient that will only last
85     # for one sched cycle, or else something is wrong. Assume the latter.
86    
87     if len(allRunningJobs) < 1 and len(waitingJobsMyVO) > 0 :
88     return 7777777
89    
90     # another silly case: someone just restarted the server.
91     # symptom (at least on PBS): no free slots, lots of running
92     # jobs, many (or all) of which have no 'start' record.
93    
94     # note this is not the only symptom ... there may be free
95     # slots but still no jobs with historical info. Some code
96     # later on to handle this in most_recent_run ...
97    
98     if lrms.slotsFree == 0:
99     nr = len(allRunningJobs)
100     nns = 0 # Number with No Start
101     for j in allRunningJobs:
102     if j.get('start') == None: nns = nns + 1
103     if float(nns)/nr >= 0.5:
104     return 7777777
105    
106     nw = len(waitingJobsMyVO)
107     if nw < 0:
108     print "found negative number of queued jobs!" ; sys.exit(1)
109     elif nw == 0:
110     if lrms.slotsFree > 0:
111     return lrms.schedCycle / 2
112     else:
113     if len(allRunningJobs) == 0:
114     return lrms.schedCycle / 2
115     else:
116     most_recent = allRunningJobs[0]
117     startList = [ ]
118     for j in allRunningJobs:
119     tval = j.get('start')
120     if tval: # jobs started during last poll cycle won't have 'start'
121     startList.append(tval)
122     else:
123     startList.append(lrms.schedCycle / 2) # rough approx
124     startList.sort() # last element will be most recent start
125     time_sys_full = lrms.now - startList[-1]
126     return time_sys_full + lrms.schedCycle/2
127     else: # there are waiting jobs
128    
129     # first check one special case: the one where *all* waiting jobs
130     # have been in the system for less than the schedule cycle
131     # if there are enough slots to run them all, skip the throughput
132     # calculation.
133    
134     waitList = [ ]
135     for j in waitingJobsMyVO:
136     waitList.append(lrms.now - j.get('qtime'))
137     waitList.sort()
138     if waitList[-1] < 1.1*lrms.schedCycle \
139     and len(waitList) < lrms.slotsFree:
140     return lrms.schedCycle / 2
141    
142     # calcs based on throughput. we know since we checked above that at
143     # least *some* jobs are running. Use jobs from our VO if we have
144     # them, otherwise use list of all running jobs.
145    
146     def vorunfunc(job):
147     return job.get('state') == 'running' and job.get('group') == vo
148     runningJobsMyVO = lrms.matchingJobs(vorunfunc)
149     if len(runningJobsMyVO) > 0:
150     runlist = runningJobsMyVO
151     else:
152     runlist = allRunningJobs
153     startList = [ ]
154     for j in runlist:
155     st = j.get('start')
156     if st :
157     startList.append(st)
158     else:
159     startList.append(lrms.now - lrms.schedCycle/2)
160     startList.sort()
161    
162     # decide how many jobs to use to calc throughput; take minimum
163     # of number running, number waiting, and the 'throughput window'
164     # that can be set on the algorithm. Note we have to use the
165     # startList instead of runlist since there may be starts for
166     # which we have no records yet (PBS caching).
167    
168     num = min(nw,len(startList),self.twin)
169     period = lrms.now - startList[-num]
170     thruput = float(num)/period # jobs per sec
171    
172     if len(runningJobsMyVO) > 0:
173     return int((nw+1) / thruput)
174     else:
175     # jobs are running but not yours, so assume you need to finish all
176     # waiting jobs before your new one starts
177     naw = len(lrms.jobs()) - len(runlist) # counts all waiting jobs
178     return int((naw+1) / thruput)
179    
180     # the "context" controls the strategy:
181     class TTEstimator(object):
182     def __init__(self, strategy):
183     self.strategy = strategy
184     def estimate(self, lrms, vo, debug):
185     return self.strategy.algorithm(lrms, vo, debug)
186    
187     # there are several algorithms for what to do with queued jobs.
188     # define them here and provide means to select between them.
189    
190     def ett_longest_queue_time(server,list) :
191     est = 7777777
192     qtlist = [ ]
193    
194     for j in list:
195     if j.get('state') == 'queued' :
196     qtlist.append(server.now - j.get('qtime'))
197     qtlist.sort()
198     est = qtlist[-1] # sorted list so this is the longest waiting.
199    
200     return est
201    
202     def ett_avg_queue_time(server,list) :
203     est = 7777777
204     # find average time in queue of all queued jobs
205     count = 0
206     sum = 0.0
207     for j in list:
208     if j.get('state') == 'queued' :
209     count = count + 1
210     sum = sum + (server.now - j.get('qtime'))
211     avgtime = sum / count
212     # avg time in queue likely to be half of expected time in queue
213     # => return twice avg as estimate.
214     est = int(2*avgtime)
215    
216     return est
217    
218     # make dict of functions to allow run-time selection
219     _ALGS = {
220     'average' : ett_avg_queue_time,
221     'longest' : ett_longest_queue_time }
222    
223     # adjusted_ett provides a central place to make any changes to
224     # return values; right now it just puts a lower limit of 5 on
225     # the lower limit. Could be used e.g. to change units.
226     def adjusted_ett(rawval,period):
227     if rawval < period:
228     return int(period / 2.)
229     else:
230     return int(rawval)
231    
232     def ett(server,queue,voname='',algorithm='longest',debug=0) :
233    
234     if debug: print 'running for VO', voname
235    
236     jfilt = {'state' : 'queued'}
237    
238     if queue != '': jfilt['queue'] = queue
239     if voname not in ['', None]: jfilt['group'] = voname
240    
241     jobsrunning = {'state': 'running'}
242    
243     nq = server.nmatch(jfilt)
244     if debug: print 'found', nq, 'queued jobs in queue \'' + queue + \
245     '\' for VO', voname
246     if nq > 0: # there are queued jobs for this VO
247     est = _ALGS[algorithm](server,server.jobs_last_query())
248     elif server.slotsFree > 0 :
249     if debug: print 'found', server.slotsFree, 'free job slots'
250     est = 0
251     else :
252     server.nmatch(jobsrunning) # get all running jobs
253     est = ett_most_recent_run(server.now,
254     server.jobs_last_query(),
255     debug)
256    
257     return adjusted_ett(est,server.schedCycle)
258    
259     def ett_most_recent_run(now,joblist,debug=0) :
260    
261     # find most recently run job
262     # make list of start times
263    
264     starts = list()
265     for j in joblist:
266     st = j.get('start')
267     if st :
268     starts.append(st)
269    
270     if len(starts) > 0:
271     starts.sort()
272     last_started = starts[-1] # last in list - start with largest timestamp
273     if debug : print now, last_started
274     timefilled = now - last_started
275     if debug : print 'most recent run', timefilled
276     return timefilled
277     else:
278     return 777777 # error code for 'i don't know'

Properties

Name Value
svn:eol-style native
svn:keywords Id URL

grid.support@nikhef.nl
ViewVC Help
Powered by ViewVC 1.1.28