/[pdpsoft]/nl.nikhef.pdp.dynsched/trunk/EstTT.py
ViewVC logotype

Annotation of /nl.nikhef.pdp.dynsched/trunk/EstTT.py

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1999 - (hide annotations) (download) (as text)
Fri Oct 8 12:18:22 2010 UTC (11 years, 7 months ago) by templon
File MIME type: application/x-python
File size: 10573 byte(s)
still transferring stuff over from old tree

1 templon 1999 #--
2     # EstTT.py -- class for ESTimating Traversal Times via various
3     # algorithms. Use "strategy" design pattern to handle alg
4     # framework.
5     # J. A. Templon, NIKHEF, PDP Group
6     # $Id: EstTT.py,v 1.7 2005/12/02 15:00:41 templon Exp $
7     #--
8    
9     # the strategy interface (ie generic algorithm interface):
10     class FindEstimate(object):
11     def algorithm(self, lrms, vo, debug):
12     """
13     lrms == concrete instance of class lrms (snapshot of LRMS state)
14     vo == virtual organization for which you want estimate
15     note: so far this usually maps to unix group of mapped user
16     debug: 0 is don't debug, 1 is to print debugging info
17     """
18     pass
19    
20     # put concrete algorithm instances here
21    
22     class Gott(FindEstimate):
23     """The original Gott ideas are embodied here. If you want to
24     use the old scheme of looking at jobs matching both the VO
25     AND queue like the old system did, do a call like
26     Estimator.strategy.queue = 'atlas'
27     before actually using the Estimator.estimate() method.
28     """
29     def __init__(self):
30     FindEstimate.__init__(self)
31     self.queue = ''
32     def algorithm(self, lrms, vo, debug):
33     return ett(lrms, self.queue, vo, algorithm='average',debug=debug)
34    
35     class WaitTime(FindEstimate):
36     """This is Gott modified to use longest-wait times instead
37     of average wait times. If you want to
38     use the old scheme of looking at jobs matching both the VO
39     AND queue like the old system did, do a call like
40     Estimator.strategy.queue = 'atlas'
41     before actually using the Estimator.estimate() method.
42     """
43     def __init__(self):
44     FindEstimate.__init__(self)
45     self.queue = ''
46     def algorithm(self, lrms, vo, debug):
47     return ett(lrms, self.queue, vo, algorithm='longest',debug=debug)
48    
49     class Throughput(FindEstimate):
50     """This class is based on looking at job throughputs and calculating
51     how long it will take to empty all waiting jobs for the desired VO
52     based on the observe througput. Motivation is that Gott & WaitTime
53     do very poorly when the job population is not stable, ie when the
54     rates of job arrival and job starts are not roughly the same.
55     """
56     def __init__(self):
57     FindEstimate.__init__(self)
58     self.twin = 10
59     """
60     Tunable parameter specifying how far back (how many jobs) in the system
61     start history to look when calculating job throughput. IE if twin
62     (Througput WINdow) is 4, code will look at no more than the last four
63     jobs started to see how many jobs per second are being started.
64     """
65     def algorithm(self, lrms, vo, debug):
66    
67     # get rid of the silliest case first
68    
69     if lrms.slotsUp < 1: # system has no job slots at all
70     return 7777777
71    
72     # gather a couple lists to help us decide
73    
74     def allrunfunc(job):
75     return job.get('state') == 'running'
76     def vowaitfunc(job):
77     return job.get('state') == 'queued' and job.get('group') == vo
78     allRunningJobs = lrms.matchingJobs(allrunfunc)
79     waitingJobsMyVO = lrms.matchingJobs(vowaitfunc)
80    
81     # another very silly case; this VO has waiting jobs, but absolutely no
82     # jobs are running. This is either a short transient that will only last
83     # for one sched cycle, or else something is wrong. Assume the latter.
84    
85     if len(allRunningJobs) < 1 and len(waitingJobsMyVO) > 0 :
86     return 7777777
87    
88     # another silly case: someone just restarted the server.
89     # symptom (at least on PBS): no free slots, lots of running
90     # jobs, many (or all) of which have no 'start' record.
91    
92     # note this is not the only symptom ... there may be free
93     # slots but still no jobs with historical info. Some code
94     # later on to handle this in most_recent_run ...
95    
96     if lrms.slotsFree == 0:
97     nr = len(allRunningJobs)
98     nns = 0 # Number with No Start
99     for j in allRunningJobs:
100     if j.get('start') == None: nns = nns + 1
101     if float(nns)/nr >= 0.5:
102     return 7777777
103    
104     nw = len(waitingJobsMyVO)
105     if nw < 0:
106     print "found negative number of queued jobs!" ; sys.exit(1)
107     elif nw == 0:
108     if lrms.slotsFree > 0:
109     return lrms.schedCycle / 2
110     else:
111     if len(allRunningJobs) == 0:
112     return lrms.schedCycle / 2
113     else:
114     most_recent = allRunningJobs[0]
115     startList = [ ]
116     for j in allRunningJobs:
117     tval = j.get('start')
118     if tval: # jobs started during last poll cycle won't have 'start'
119     startList.append(tval)
120     else:
121     startList.append(lrms.schedCycle / 2) # rough approx
122     startList.sort() # last element will be most recent start
123     time_sys_full = lrms.now - startList[-1]
124     return time_sys_full + lrms.schedCycle/2
125     else: # there are waiting jobs
126    
127     # first check one special case: the one where *all* waiting jobs
128     # have been in the system for less than the schedule cycle
129     # if there are enough slots to run them all, skip the throughput
130     # calculation.
131    
132     waitList = [ ]
133     for j in waitingJobsMyVO:
134     waitList.append(lrms.now - j.get('qtime'))
135     waitList.sort()
136     if waitList[-1] < 1.1*lrms.schedCycle \
137     and len(waitList) < lrms.slotsFree:
138     return lrms.schedCycle / 2
139    
140     # calcs based on throughput. we know since we checked above that at
141     # least *some* jobs are running. Use jobs from our VO if we have
142     # them, otherwise use list of all running jobs.
143    
144     def vorunfunc(job):
145     return job.get('state') == 'running' and job.get('group') == vo
146     runningJobsMyVO = lrms.matchingJobs(vorunfunc)
147     if len(runningJobsMyVO) > 0:
148     runlist = runningJobsMyVO
149     else:
150     runlist = allRunningJobs
151     startList = [ ]
152     for j in runlist:
153     st = j.get('start')
154     if st :
155     startList.append(st)
156     else:
157     startList.append(lrms.now - lrms.schedCycle/2)
158     startList.sort()
159    
160     # decide how many jobs to use to calc throughput; take minimum
161     # of number running, number waiting, and the 'throughput window'
162     # that can be set on the algorithm. Note we have to use the
163     # startList instead of runlist since there may be starts for
164     # which we have no records yet (PBS caching).
165    
166     num = min(nw,len(startList),self.twin)
167     period = lrms.now - startList[-num]
168     thruput = float(num)/period # jobs per sec
169    
170     if len(runningJobsMyVO) > 0:
171     return int((nw+1) / thruput)
172     else:
173     # jobs are running but not yours, so assume you need to finish all
174     # waiting jobs before your new one starts
175     naw = len(lrms.jobs()) - len(runlist) # counts all waiting jobs
176     return int((naw+1) / thruput)
177    
178     # the "context" controls the strategy:
179     class TTEstimator(object):
180     def __init__(self, strategy):
181     self.strategy = strategy
182     def estimate(self, lrms, vo, debug):
183     return self.strategy.algorithm(lrms, vo, debug)
184    
185     # there are several algorithms for what to do with queued jobs.
186     # define them here and provide means to select between them.
187    
188     def ett_longest_queue_time(server,list) :
189     est = 7777777
190     qtlist = [ ]
191    
192     for j in list:
193     if j.get('state') == 'queued' :
194     qtlist.append(server.now - j.get('qtime'))
195     qtlist.sort()
196     est = qtlist[-1] # sorted list so this is the longest waiting.
197    
198     return est
199    
200     def ett_avg_queue_time(server,list) :
201     est = 7777777
202     # find average time in queue of all queued jobs
203     count = 0
204     sum = 0.0
205     for j in list:
206     if j.get('state') == 'queued' :
207     count = count + 1
208     sum = sum + (server.now - j.get('qtime'))
209     avgtime = sum / count
210     # avg time in queue likely to be half of expected time in queue
211     # => return twice avg as estimate.
212     est = int(2*avgtime)
213    
214     return est
215    
216     # make dict of functions to allow run-time selection
217     _ALGS = {
218     'average' : ett_avg_queue_time,
219     'longest' : ett_longest_queue_time }
220    
221     # adjusted_ett provides a central place to make any changes to
222     # return values; right now it just puts a lower limit of 5 on
223     # the lower limit. Could be used e.g. to change units.
224     def adjusted_ett(rawval,period):
225     if rawval < period:
226     return int(period / 2.)
227     else:
228     return int(rawval)
229    
230     def ett(server,queue,voname='',algorithm='longest',debug=0) :
231    
232     if debug: print 'running for VO', voname
233    
234     jfilt = {'state' : 'queued'}
235    
236     if queue != '': jfilt['queue'] = queue
237     if voname not in ['', None]: jfilt['group'] = voname
238    
239     jobsrunning = {'state': 'running'}
240    
241     nq = server.nmatch(jfilt)
242     if debug: print 'found', nq, 'queued jobs in queue \'' + queue + \
243     '\' for VO', voname
244     if nq > 0: # there are queued jobs for this VO
245     est = _ALGS[algorithm](server,server.jobs_last_query())
246     elif server.slotsFree > 0 :
247     if debug: print 'found', server.slotsFree, 'free job slots'
248     est = 0
249     else :
250     server.nmatch(jobsrunning) # get all running jobs
251     est = ett_most_recent_run(server.now,
252     server.jobs_last_query(),
253     debug)
254    
255     return adjusted_ett(est,server.schedCycle)
256    
257     def ett_most_recent_run(now,joblist,debug=0) :
258    
259     # find most recently run job
260     # make list of start times
261    
262     starts = list()
263     for j in joblist:
264     st = j.get('start')
265     if st :
266     starts.append(st)
267    
268     if len(starts) > 0:
269     starts.sort()
270     last_started = starts[-1] # last in list - start with largest timestamp
271     if debug : print now, last_started
272     timefilled = now - last_started
273     if debug : print 'most recent run', timefilled
274     return timefilled
275     else:
276     return 777777 # error code for 'i don't know'

Properties

Name Value
svn:eol-style native

grid.support@nikhef.nl
ViewVC Help
Powered by ViewVC 1.1.28