/[pdpsoft]/nl.nikhef.ndpf.mcfloat/trunk/mcfloat
ViewVC logotype

Diff of /nl.nikhef.ndpf.mcfloat/trunk/mcfloat

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 2841 by templon, Mon Jan 25 18:45:02 2016 UTC revision 2842 by templon, Mon Jan 25 19:59:44 2016 UTC
# Line 1  Line 1 
1  #!/usr/bin/env python  #!/usr/bin/env python
 from torque_utils import pbsnodes  
2    
3  # this config allows mcore to run up to full ATLAS allocation (07.04.2015)  # Class to hold parameters:
4    
5    class mcfloat_cfg:
6        def __init__(self):   # default values; adjust for your site
7            self.maxdrain = 32
8            self.maxfree = 49
9            self.maxfreepernode = 3
10            self.nodespergrab = 3
11            self.capacityfrac = 0.47
12            self.mcqueue = 'atlasmc'
13            self.underpop_nodes_file = "/var/db/nodes_with_too_few_jobs"
14            self.torque = "stro.nikhef.nl"
15    
16    MCFLOAT_CONFIG_FILE = "/var/db/mcfloat_config"
17    
18    # the above config allows mcore to run up to full ATLAS allocation (07.04.2015)
19  # full allocation right now is 47% of "non-retired" capacity = knal + mars + car  # full allocation right now is 47% of "non-retired" capacity = knal + mars + car
20  # just configure 47% of each class.  # just configure 47% of each class.
21    
22  MAXDRAIN = 32            # max num of nodes allowed to drain  # override when lsgrid is running (legacy comments, might be useful)
 MAXFREE  = 49            # max num of free slots to tolerate  
 MAXFREEPERNODE = 3       # standard for four-core jobs  
 NODESPERGRAB = 3         # number of nodes to grab each time around  
   
 # override when lsgrid is running  
23    
24  # MAXFREEPERNODE = 29       # allow lsgrid 30-core jobs  # MAXFREEPERNODE = 29       # allow lsgrid 30-core jobs
25  # MAXFREE  = 149            # max num of free slots to tolerate  # MAXFREE  = 149            # max num of free slots to tolerate
# Line 20  NODESPERGRAB = 3         # number of nod Line 29  NODESPERGRAB = 3         # number of nod
29  # MAXFREE  = 149            # max num of free slots to tolerate  # MAXFREE  = 149            # max num of free slots to tolerate
30  # MAXFREEPERNODE = 9       # allow bbmri 10-core jobs  # MAXFREEPERNODE = 9       # allow bbmri 10-core jobs
31    
 # parameterize percent capacity -- 47% is too conservative, make this variable.  
   
   
 # original car was [ 'wn-car-0%02d.farm.nikhef.nl' % (n) for n in range( 96-int(CAPACFRAC*96),  97) ]  
 # temporarily changed due to low-numbered cars offline and reserved and such.  
   
 CAPACFRAC = 0.85  
   
 NUM_CAR  = int(CAPACFRAC * 96)  
 NUM_KNAL = int(CAPACFRAC * 18)  
 NUM_MARS = int(CAPACFRAC * 58)  
   
 CANDIDATE_NODES = [ 'wn-car-0%02d.farm.nikhef.nl' % (n)  for n in range( 1, NUM_CAR+1) ] + \  
                   [ 'wn-knal-0%02d.farm.nikhef.nl' % (n) for n in range( 1, NUM_KNAL+1) ] + \  
                   [ 'wn-mars-0%02d.farm.nikhef.nl' % (n) for n in range( 1, NUM_MARS+1) ]  
   
 MCQUEUE  = 'atlasmc'  
   
 # settings below appropriate for knal nodes (32 cores)  
 # ... want to keep num of draining nodes  
 # to MAXFREE / 3 since giving back a node could be expensive.  
 # it might already be running several mc jobs  
   
 # MAXDRAIN = 16            # max num of nodes allowed to drain  
 # MAXFREE  = 49            # max num of free slots to tolerate  
 # MAXFREEPERNODE = 3  
 # NODESPERGRAB = 1  
 # CANDIDATE_NODES = [ 'wn-knal-0%02d.farm.nikhef.nl' % (n) for n in range(1,19) ]  
 # MCQUEUE  = 'atlasmc'  
   
 # settings below more appropriate for smrt nodes (8 core)  
 # here giving a node back doesn't really matter, it wasn't  
 # running any mc jobs yet anyway.  better to grab lots  
 # of nodes and give back the ones that drain the slowest if  
 # too many slots are free.  
   
 # MAXDRAIN = 16            # max num of nodes allowed to drain  
 # MAXFREE  = 49            # max num of free slots to tolerate  
 # CANDIDATE_NODES = [ 'wn-smrt-0%02d.farm.nikhef.nl' % (n) for n in range(1,73) ]  
   
 UNDERPOP_NODES_FILE = ".nodes_with_too_few_jobs"  
 TORQUE              = "stro.nikhef.nl"  
   
32  import os  import os
33  import pickle  import pickle
34  import sys  import sys
35  import subprocess  import subprocess
36    
37  def getmcjobinfo(nodes):  def getmcjobinfo(nodes, cfg):
38    
39  # input is the list of nodes for which to get info  # input is the list of nodes for which to get info
40  # function returns tuple (jobs_waiting, node_info)  # function returns tuple (jobs_waiting, node_info)
# Line 86  def getmcjobinfo(nodes): Line 52  def getmcjobinfo(nodes):
52      qsfname = tempfile.mktemp(".txt","qsmf",os.environ['HOME']+"/tmp")      qsfname = tempfile.mktemp(".txt","qsmf",os.environ['HOME']+"/tmp")
53    
54      import time      import time
55      os.system('/usr/bin/qstat -f @' + TORQUE + ' > ' + qsfname)      os.system('/usr/bin/qstat -f @' + cfg.torque + ' > ' + qsfname)
56      now = time.mktime(time.localtime())      now = time.mktime(time.localtime())
57    
58      jlist = torqueJobs.qs_parsefile(qsfname)      jlist = torqueJobs.qs_parsefile(qsfname)
# Line 123  def getmcjobinfo(nodes): Line 89  def getmcjobinfo(nodes):
89      mcjoblist = list()      mcjoblist = list()
90      waitingJobs = False      waitingJobs = False
91      for j in jlist:      for j in jlist:
92          if j['queue'] == MCQUEUE:          if j['queue'] == cfg.mcqueue:
93              newj = mapatts(j,usedkeys)              newj = mapatts(j,usedkeys)
94              mcjoblist.append(newj)              mcjoblist.append(newj)
95              if newj['job_state'] == 'queued':              if newj['job_state'] == 'queued':
# Line 214  p = optparse.OptionParser(description="M Line 180  p = optparse.OptionParser(description="M
180    
181  p.add_option("-A",action="store_true",dest="addall",default=False,  p.add_option("-A",action="store_true",dest="addall",default=False,
182               help="add all eligible nodes to multicore pool")               help="add all eligible nodes to multicore pool")
183    p.add_option("-D",action="store_true",dest="delall",default=False,
184                 help="delete all nodes from multicore pool")
185  p.add_option("-l",action="store",dest="logfile",default=None,  p.add_option("-l",action="store",dest="logfile",default=None,
186               help="log actions and information to LOGFILE (default stdout)")               help="log actions and information to LOGFILE (default stdout)")
187  p.add_option("-L",action="store",dest="loglevel",default="INFO",  p.add_option("-L",action="store",dest="loglevel",default="INFO",
# Line 222  p.add_option("-n",action="store_true",de Line 190  p.add_option("-n",action="store_true",de
190               help="don't do anything, just print what would have been done")               help="don't do anything, just print what would have been done")
191  p.add_option("-i",action="store_true",dest="info",default=False,  p.add_option("-i",action="store_true",dest="info",default=False,
192               help="print info on all nodes in multicore pool")               help="print info on all nodes in multicore pool")
193    p.add_option("-f",action="store",dest="newfrac",default=None,
194                 help="fraction (out of 1.0) of in-service nodes to commit to multicore pool")
195    
196  import logging  import logging
197    
# Line 241  else: Line 211  else:
211      logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',      logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
212                          level=numeric_level)                          level=numeric_level)
213    
214  if os.path.isfile(UNDERPOP_NODES_FILE):  if os.path.isfile(MCFLOAT_CONFIG_FILE):
215      pkl_file = open(UNDERPOP_NODES_FILE,'rb')      pkl_file = open(MCFLOAT_CONFIG_FILE,'rb')
216        cfg = pickle.load(pkl_file)
217        pkl_file.close()
218    else:
219        cfg = mcfloat_cfg() # default values
220    
221    config_changed = False
222    
223    # bundle all config changers here
224    
225    if opts.newfrac:
226        cfg.capacityfrac = float(opts.newfrac)
227        if cfg.capacityfrac < 0 or cfg.capacityfrac > 1:
228            raise ValueError('Invalid capacity fraction: %s' % opts.newfrac)
229        config_changed = True
230    
231    ## below after all config changers
232    
233    if config_changed:
234        logging.info("writing out changed mcfloat config to config file")
235        if not opts.noopt:
236            pkl_file = open(MCFLOAT_CONFIG_FILE,'wb')
237            pickle.dump(cfg,pkl_file)
238            pkl_file.close()
239        sys.exit(0)
240    
241    if opts.delall:
242        cfrac = 1
243    else:
244        cfrac = cfg.capacityfrac
245    
246    NUM_CAR  = int(cfrac * 96)
247    NUM_KNAL = int(cfrac * 18)
248    NUM_MARS = int(cfrac * 58)
249    
250    CANDIDATE_NODES = [ 'wn-car-0%02d.farm.nikhef.nl' % (n)  for n in range( 1, NUM_CAR+1) ] + \
251                      [ 'wn-knal-0%02d.farm.nikhef.nl' % (n) for n in range( 1, NUM_KNAL+1) ] + \
252                      [ 'wn-mars-0%02d.farm.nikhef.nl' % (n) for n in range( 1, NUM_MARS+1) ]
253    
254    if os.path.isfile(cfg.underpop_nodes_file):
255        pkl_file = open(cfg.underpop_nodes_file,'rb')
256      nodes_too_few_jobs_last_run = pickle.load(pkl_file)      nodes_too_few_jobs_last_run = pickle.load(pkl_file)
257      pkl_file.close()      pkl_file.close()
258  else:  else:
# Line 251  else: Line 261  else:
261  logging.debug("nodes from pickle file, marked from last run: " + \  logging.debug("nodes from pickle file, marked from last run: " + \
262                repr(nodes_too_few_jobs_last_run) )                repr(nodes_too_few_jobs_last_run) )
263    
264    from torque_utils import pbsnodes
265    
266  wnodes     = pbsnodes()  wnodes     = pbsnodes()
267  mcnodes    = list()  mcnodes    = list()
268  waiting    = None  waiting    = None
# Line 271  for node in mcnodes: Line 283  for node in mcnodes:
283          if node.freeCpu > 0 : draining_nodes += 1          if node.freeCpu > 0 : draining_nodes += 1
284    
285  if opts.info:  if opts.info:
286      waiting, node_tuples = getmcjobinfo(mcdedicated)      waiting, node_tuples = getmcjobinfo(mcdedicated,cfg)
287      for t in node_tuples:      for t in node_tuples:
288          if t[2] == 0:          if t[2] == 0:
289              print "%28s has %2d running mc jobs, %2d empty slots" % (t[0], t[2], t[3])              print "%28s has %2d running mc jobs, %2d empty slots" % (t[0], t[2], t[3])
# Line 283  for n in mcnodes: Line 295  for n in mcnodes:
295      if n not in mcdedicated:      if n not in mcdedicated:
296          candidate_nodes.append(n)          candidate_nodes.append(n)
297    
298    if opts.delall:
299        for grabbed_node in mcdedicated:
300            logging.info("%s deleted from mc node pool" % (grabbed_node.name))
301            if not opts.noopt: remove_from_mc_pool(grabbed_node)
302        sys.exit(0)
303    
304  if opts.addall:  if opts.addall:
305      for grabbed_node in candidate_nodes:      for grabbed_node in candidate_nodes:
306          logging.info("%s added to mc node pool" % (grabbed_node.name))          logging.info("%s added to mc node pool" % (grabbed_node.name))
# Line 296  nodes_with_too_few_jobs = list() Line 314  nodes_with_too_few_jobs = list()
314    
315  for node in mcdedicated:  for node in mcdedicated:
316      logging.debug(node.name + " has " + repr(node.freeCpu) + " free slots")      logging.debug(node.name + " has " + repr(node.freeCpu) + " free slots")
317      if node.freeCpu > MAXFREEPERNODE:      if node.freeCpu > cfg.maxfreepernode:
318          nodes_with_too_few_jobs.append(node)          nodes_with_too_few_jobs.append(node)
319    
320  logging.debug("there are " + repr(len(nodes_with_too_few_jobs)) + \  logging.debug("there are " + repr(len(nodes_with_too_few_jobs)) + \
# Line 328  if undcount > 0: Line 346  if undcount > 0:
346    
347  # find out how many running mc jobs each node has  # find out how many running mc jobs each node has
348    
349      waiting, node_tuples = getmcjobinfo(nodes_consistently_underpopulated)      waiting, node_tuples = getmcjobinfo(nodes_consistently_underpopulated,cfg)
350    
351      for node_t in node_tuples[:remcount]:      for node_t in node_tuples[:remcount]:
352          nname, nnode, running_mc, unused_slots = node_t          nname, nnode, running_mc, unused_slots = node_t
# Line 348  for n in nodes_with_too_few_jobs: Line 366  for n in nodes_with_too_few_jobs:
366      logging.debug(n.name)      logging.debug(n.name)
367      namelist.append(n.name)      namelist.append(n.name)
368  if not opts.noopt:  if not opts.noopt:
369      pkl_file = open(UNDERPOP_NODES_FILE,'wb')      pkl_file = open(cfg.underpop_nodes_file,'wb')
370      pickle.dump(namelist,pkl_file)      pickle.dump(namelist,pkl_file)
371      pkl_file.close()      pkl_file.close()
372  if len(nodes_with_too_few_jobs) > 0 or removed_a_node :  if len(nodes_with_too_few_jobs) > 0 or removed_a_node :
# Line 359  if len(nodes_with_too_few_jobs) > 0 or r Line 377  if len(nodes_with_too_few_jobs) > 0 or r
377    
378  logging.debug("There are " + repr(len(mcdedicated)) + " dedicated nodes and " + \  logging.debug("There are " + repr(len(mcdedicated)) + " dedicated nodes and " + \
379      repr(draining_slots) + " unused slots")      repr(draining_slots) + " unused slots")
380  if (MAXFREE - draining_slots) >= MAXFREEPERNODE:  if (cfg.maxfree - draining_slots) >= cfg.maxfreepernode:
381      logging.debug("%d unused slots are permitted, so %d more" % (MAXFREE, MAXFREE - draining_slots))      logging.debug("%d unused slots are permitted, so %d more" % (cfg.maxfree, cfg.maxfree - draining_slots))
382      logging.debug("headroom of more than %d+ slots means we can try to grab another node" % (MAXFREEPERNODE) )      logging.debug("headroom of more than %d+ slots means we can try to grab another node" % (cfg.maxfreepernode) )
383      if draining_nodes < MAXDRAIN :      if draining_nodes < cfg.maxdrain :
384          # first check if there are actually any waiting jobs to run; if not makes no sense to grab a node.          # first check if there are actually any waiting jobs to run; if not makes no sense to grab a node.
385          waiting, node_tuples = getmcjobinfo(mcdedicated)          waiting, node_tuples = getmcjobinfo(mcdedicated,cfg)
386          if not waiting:          if not waiting:
387              logging.debug("No waiting jobs found, nothing to do")              logging.debug("No waiting jobs found, nothing to do")
388              sys.exit(0)              sys.exit(0)
389          logging.debug("max %d node(s) with unused slots permitted, %d seen" % \          logging.debug("max %d node(s) with unused slots permitted, %d seen" % \
390                        (MAXDRAIN, draining_nodes))                        (cfg.maxdrain, draining_nodes))
391          logging.debug("there are also waiting jobs: try to grab another node")          logging.debug("there are also waiting jobs: try to grab another node")
392          # build a list of candidates to grab          # build a list of candidates to grab
393          logging.debug("found %d candidate nodes to dedicate to mc" % len(candidate_nodes))          logging.debug("found %d candidate nodes to dedicate to mc" % len(candidate_nodes))
394          if len(candidate_nodes) < 1:          if len(candidate_nodes) < 1:
395              logging.debug("no more nodes, bailing out, nothing more I can do")              logging.debug("no more nodes, bailing out, nothing more I can do")
396              sys.exit(0)              sys.exit(0)
397          for nn in range(min(NODESPERGRAB,len(candidate_nodes))):          for nn in range(min(cfg.nodespergrab,len(candidate_nodes))):
398  #            logging.debug("found %d candidate nodes to dedicate to mc" % len(candidate_nodes))  #            logging.debug("found %d candidate nodes to dedicate to mc" % len(candidate_nodes))
399              grabbed_node=random.choice(candidate_nodes)              grabbed_node=random.choice(candidate_nodes)
400              logging.info("%s added to mc node pool" % (grabbed_node.name))              logging.info("%s added to mc node pool" % (grabbed_node.name))
# Line 384  if (MAXFREE - draining_slots) >= MAXFREE Line 402  if (MAXFREE - draining_slots) >= MAXFREE
402              if not opts.noopt: add_to_mc_pool(grabbed_node)              if not opts.noopt: add_to_mc_pool(grabbed_node)
403      else:      else:
404          logging.debug("There are %d nodes with unused slots (draining)" % (draining_nodes))          logging.debug("There are %d nodes with unused slots (draining)" % (draining_nodes))
405          logging.debug("This equals or exceeds the configured max of %d" % (MAXDRAIN))          logging.debug("This equals or exceeds the configured max of %d" % (cfg.maxdrain))
406          logging.debug("Doing nothing now")          logging.debug("Doing nothing now")
407    
408  elif draining_slots > MAXFREE:  elif draining_slots > cfg.maxfree:
409    
410      # find out how many running mc jobs each node has      # find out how many running mc jobs each node has
411    
412      waiting, node_tuples = getmcjobinfo(mcdedicated)      waiting, node_tuples = getmcjobinfo(mcdedicated,cfg)
413    
414      slots_to_recover = draining_slots - MAXFREE      slots_to_recover = draining_slots - cfg.MAXFREE
415      logging.info("unused slot limit (%d) exceeded by %d " \      logging.info("unused slot limit (%d) exceeded by %d " \
416                   % (MAXFREE, slots_to_recover) + ": remove node(s) from mc pool")                   % (cfg.maxfree, slots_to_recover) + ": remove node(s) from mc pool")
417      slots_recovered = 0      slots_recovered = 0
418      while slots_recovered < slots_to_recover:      while slots_recovered < slots_to_recover:
419          node_t = node_tuples.pop(0)          node_t = node_tuples.pop(0)
# Line 406  elif draining_slots > MAXFREE: Line 424  elif draining_slots > MAXFREE:
424              if not opts.noopt : remove_from_mc_pool(nnode)              if not opts.noopt : remove_from_mc_pool(nnode)
425              slots_recovered += unused_slots              slots_recovered += unused_slots
426  else:  else:
427      logging.debug("%d unused slots of allowed %d" % (draining_slots, MAXFREE))      logging.debug("%d unused slots of allowed %d" % (draining_slots, cfg.maxfree))
428      logging.debug("difference is %d which is less than %d" % \      logging.debug("difference is %d which is less than %d" % \
429                    (MAXFREE - draining_slots, MAXFREEPERNODE) )                    (cfg.maxfree - draining_slots, cfg.maxfreepernode) )
430      logging.debug("so: doing nothing now.")      logging.debug("so: doing nothing now.")

Legend:
Removed from v.2841  
changed lines
  Added in v.2842

grid.support@nikhef.nl
ViewVC Help
Powered by ViewVC 1.1.28