#!/usr/bin/env python # $Id$ # Source: $URL$ # J. A. Templon, NIKHEF/PDP 2014 # Class to hold parameters: class mcfloat_cfg: def __init__(self): # default values; adjust for your site self.maxdrain = 32 self.maxfree = 49 self.maxfreepernode = 3 self.nodespergrab = 3 self.capacityfrac = 0.48 self.mcqueue = 'atlasmc7' self.underpop_nodes_file = "/var/db/nodes_with_too_few_jobs" self.torque = "korf.nikhef.nl" def __repr__(self): return ("Capacity Fraction %4.2f\nMax Drain %d\nMax Free %d\n" + \ "Max Free per Node %d") % (self.capacityfrac, self.maxdrain, self.maxfree, self.maxfreepernode ) MCFLOAT_CONFIG_FILE = "/var/db/mcfloat_config" # the above config allows mcore to run up to full ATLAS allocation (07.04.2015) # full allocation right now is 47% of "non-retired" capacity = knal + mars + car # just configure 47% of each class. # override when lsgrid is running (legacy comments, might be useful) # MAXFREEPERNODE = 29 # allow lsgrid 30-core jobs # MAXFREE = 149 # max num of free slots to tolerate # override when bbmri is running # MAXFREE = 149 # max num of free slots to tolerate # MAXFREEPERNODE = 9 # allow bbmri 10-core jobs import os import pickle import sys import subprocess def getmcjobinfo(nodes, cfg): # input is the list of nodes for which to get info # function returns tuple (jobs_waiting, node_info) # jobs_waiting boolean value : are there MC jobs waiting to be run (state Q)? (independent of nodes!) # node_info: list of tuples (wn_name, wn, num_running_mc, num_free_slots) # where wn is the node object for the machine with name wn_name, num_running_mc is the number of # mc jobs running on that node, and num_free_slots is the number of unused job slots on that node usedkeys = ['egroup', 'jobid', 'queue', 'job_state', 'euser', 'exec_host' ] import torqueJobs import torqueAttMappers as tam import tempfile qsfname = tempfile.mktemp(".txt","qsmf",os.environ['HOME']+"/tmp") import time os.system('/usr/bin/qstat -f @' + cfg.torque + ' > ' + qsfname) now = time.mktime(time.localtime()) jlist = torqueJobs.qs_parsefile(qsfname) os.system('mv ' + qsfname + ' ' + os.environ['HOME'] + '/tmp/qstat.last.txt') def mapatts(indict,inkeys): sdict = tam.sub_dict(indict,inkeys) odict = dict() for k in sdict.keys(): if k in tam.tfl and sdict[k]: secs = tam.hms(sdict[k]) sdict[k] = secs/3600. if k in tam.mfl and sdict[k]: mebi = tam.memconvert(sdict[k]) sdict[k] = mebi if k in tam.tfl2 and sdict[k]: secs = tam.tconv(sdict[k]) sdict[k] = secs elif k == 'job_state': statelett = sdict[k] if statelett in ['Q','W']: sdict[k] = 'queued' elif statelett in ['R','E']: sdict[k] = 'running' elif k == 'exec_host' and sdict[k]: termpos = sdict[k].find('/') wnstring = sdict[k][:termpos] sdict[k] = wnstring if sdict[k]: odict[k] = sdict[k] return odict mcjoblist = list() waitingJobs = False for j in jlist: if j['queue'] == cfg.mcqueue: newj = mapatts(j,usedkeys) mcjoblist.append(newj) if newj['job_state'] == 'queued': waitingJobs = True # find out how many running mc jobs each node has running_mc = dict() for n in nodes: running_mc[n.name] = 0 for j in mcjoblist: if j['job_state'] =='running': wn = j['exec_host'] if wn in running_mc.keys(): running_mc[wn] += 1 rawlist = list() for n in nodes: rawlist.append( (n.name, n, running_mc[n.name], n.freeCpu) ) from operator import itemgetter def srtrunningfree(item): # sort optimal for dropping nodes; for nodes with zero running mc jobs, want to # drop least-drained nodes (fewer free slots first in list); for nodes with # running mc jobs, want to shed as few nodes as possible so pick the more-drained # nodes first. this probably also works for shedding nodes when queue dries up. mcrun = item[2] emptyslots = item[3] if mcrun == 0: rank = emptyslots else: # the original "optimal" order: rank = 32*mcrun - emptyslots # 32 just a number bigger than expected emptyslots value rank = 7168 - 224*float(emptyslots)/mcrun + mcrun # constants 7168 and 224 only serve to enable a multi-level sort. return rank slist = sorted(rawlist, key=srtrunningfree) return waitingJobs, slist def remove_from_mc_pool(node): # print "adding el7 tag back to node", node.name if "el7" not in node.properties : proc = subprocess.Popen(['/usr/bin/qmgr'], stdin=subprocess.PIPE, stdout=subprocess.PIPE,) proc.stdin.write( 'set node ' + node.name + \ ' properties += el7\n' ) proc.stdin.write( 'print node ' + node.name + \ ' properties\n' ) out = proc.communicate()[0] # print out if "mc" in node.properties : proc = subprocess.Popen(['/usr/bin/qmgr'], stdin=subprocess.PIPE, stdout=subprocess.PIPE,) proc.stdin.write( 'set node ' + node.name + \ ' properties -= mc\n' ) proc.stdin.write( 'print node ' + node.name + \ ' properties\n' ) out = proc.communicate()[0] # print out def add_to_mc_pool(node): # print "node props b4:", grabbed_node.properties if "el7" in node.properties : proc = subprocess.Popen(['/usr/bin/qmgr'], stdin=subprocess.PIPE, stdout=subprocess.PIPE,) proc.stdin.write( 'set node ' + node.name + \ ' properties -= el7\n' ) proc.stdin.write( 'print node ' + node.name + \ ' properties\n' ) out = proc.communicate()[0] if "mc" not in node.properties : proc = subprocess.Popen(['/usr/bin/qmgr'], stdin=subprocess.PIPE, stdout=subprocess.PIPE,) proc.stdin.write( 'set node ' + node.name + \ ' properties += mc\n' ) proc.stdin.write( 'print node ' + node.name + \ ' properties\n' ) out = proc.communicate()[0] import optparse usage = "usage: %prog [options]" p = optparse.OptionParser(description="Monitor state of multicore pool and adjust size as needed", usage=usage) p.add_option("-A",action="store_true",dest="addall",default=False, help="add all eligible nodes to multicore pool") p.add_option("-D",action="store_true",dest="delall",default=False, help="delete all nodes from multicore pool") p.add_option("-E",action="store_true",dest="add_everything",default=False, help="Adds all nodes (even non-candidates) to the multicore pool") p.add_option("-R",action="store_true",dest="remove_strays_from_pool",default=False, help="remove mc pool nodes that are not one of the configured candidate nodes") p.add_option("-l",action="store",dest="logfile",default=None, help="log actions and information to LOGFILE (default stdout)") p.add_option("-L",action="store",dest="loglevel",default="INFO", help="print messages of LOGLEVEL (DEBUG, INFO, WARNING, ..., default INFO") p.add_option("-n",action="store_true",dest="noopt",default=False, help="don't do anything, just print what would have been done") p.add_option("-i",action="store_true",dest="info",default=False, help="print info on all nodes in multicore pool") p.add_option("-d",action="store",dest="new_max_drain",default=None, help="maximum number of nodes allowed in draining state") p.add_option("-m",action="store",dest="new_max_free",default=None, help="maximum number of unused slots to tolerate in the multicore pool") p.add_option("-p",action="store",dest="new_max_free_per_node",default=None, help="maximum number of unused slots per node to tolerate in the multicore pool") p.add_option("-f",action="store",dest="newfrac",default=None, help="fraction (out of 1.0) of in-service nodes to commit to multicore pool") p.add_option("-r",action="store_true",dest="reset_config",default=False, help="reset mcfloat config to the default") p.add_option("-q",action="store_true",dest="querycfg",default=False, help="print current config and exit") import logging opts, args = p.parse_args() # assuming loglevel is bound to the string value obtained from the # command line argument. Convert to upper case to allow the user to # specify --log=DEBUG or --log=debug numeric_level = getattr(logging, opts.loglevel.upper(), None) if not isinstance(numeric_level, int): raise ValueError('Invalid log level: %s' % loglevel) if opts.logfile: logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=numeric_level,filename=opts.logfile) else: logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=numeric_level) if os.path.isfile(MCFLOAT_CONFIG_FILE): pkl_file = open(MCFLOAT_CONFIG_FILE,'rb') cfg = pickle.load(pkl_file) pkl_file.close() else: cfg = mcfloat_cfg() # default values if opts.querycfg: print "Current mcfloat configuration:" print cfg sys.exit(0) config_changed = False # bundle all config changers here if opts.newfrac: cfg.capacityfrac = float(opts.newfrac) if cfg.capacityfrac < 0 or cfg.capacityfrac > 1: raise ValueError('Invalid capacity fraction: %s' % opts.newfrac) logging.info("Changing capacity fraction to %s" % opts.newfrac) config_changed = True if opts.new_max_free: cfg.maxfree = int(opts.new_max_free) if cfg.maxfree < 0: raise ValueError('Invalid max free slots: %s' % opts.new_max_free) logging.info("Changing max free slots to %s" % opts.new_max_free) config_changed = True if opts.new_max_free_per_node: cfg.maxfreepernode = int(opts.new_max_free_per_node) if cfg.maxfreepernode < 0: raise ValueError('Invalid max free slots per node: %s' % opts.new_max_free_per_node) logging.info("Changing max free slots per node to %s" % opts.new_max_free_per_node) config_changed = True if opts.new_max_drain: cfg.maxdrain = int(opts.new_max_drain) if cfg.maxdrain < 0: raise ValueError('Invalid max num draining nodes: %s' % opts.new_max_drain) logging.info("Changing max number draining nodes to %s" % opts.new_max_drain) config_changed = True if opts.reset_config: cfg = mcfloat_cfg() # default values logging.info("Reset mcfloat config to default values") config_changed = True ## below after all config changers if config_changed: logging.info("writing out changed mcfloat config to config file") if not opts.noopt: pkl_file = open(MCFLOAT_CONFIG_FILE,'wb') pickle.dump(cfg,pkl_file) pkl_file.close() sys.exit(0) if opts.delall: cfrac = 1 else: cfrac = cfg.capacityfrac # NUM_CAR = int(cfrac * 96) # NUM_KNAL = int(cfrac * 18) # NUM_MARS = int(cfrac * 58) NUM_CHOC = int(cfrac * 58) NUM_PEP = int(cfrac * 13) NUM_SATE = int(cfrac * 54) NUM_TAAI = int(cfrac * 12) NUM_LOT = int(cfrac * 48) # lots start at 010 and go to 057 NUM_SNEL = int(cfrac * 32) # there is no check whether START + NUM < number of nodes # so careful when changing START from 1 # START_CAR = 1 # START_KNAL = 1 # START_MARS = 1 START_CHOC = 1 START_PEP = 1 START_SATE = 1 START_TAAI = 1 START_LOT = 10 START_SNEL = 1 # RANGE_CAR = range(START_CAR, START_CAR + NUM_CAR + 1) # RANGE_KNAL = range(START_KNAL, START_KNAL + NUM_KNAL) # RANGE_MARS = range(START_MARS, START_MARS + NUM_MARS) RANGE_CHOC = range(START_CHOC, START_CHOC + NUM_CHOC) RANGE_PEP = range(START_PEP, START_PEP + NUM_PEP) RANGE_SATE = range(START_SATE, START_SATE + NUM_SATE) RANGE_TAAI = range(START_TAAI, START_TAAI + NUM_TAAI) RANGE_LOT = range(START_LOT, START_LOT + NUM_LOT) RANGE_SNEL = range(START_SNEL, START_SNEL + NUM_SNEL) CANDIDATE_NODES = [ 'wn-choc-0%02d.farm.nikhef.nl' % (n) for n in RANGE_CHOC ] + \ [ 'wn-pep-0%02d.farm.nikhef.nl' % (n) for n in RANGE_PEP ] + \ [ 'wn-sate-0%02d.farm.nikhef.nl' % (n) for n in RANGE_SATE ] + \ [ 'wn-taai-0%02d.farm.nikhef.nl' % (n) for n in RANGE_TAAI ] + \ [ 'wn-lot-0%02d.farm.nikhef.nl' % (n) for n in RANGE_LOT ] + \ [ 'wn-snel-0%02d.farm.nikhef.nl' % (n) for n in RANGE_SNEL ] from torque_utils import pbsnodes wnodes = pbsnodes() mcnodes = list() waiting = None node_tuples = None if opts.add_everything: for gn in [n for n in wnodes if 'offline' not in n.state if ('mc' not in n.properties and 'el7' in n.properties)] : logging.info("%s added to mc node pool" % (gn.name)) # candidate_nodes.remove(grabbed_node) if not opts.noopt: add_to_mc_pool(gn) sys.exit(0) if opts.remove_strays_from_pool: for node in wnodes: if node.name not in CANDIDATE_NODES and 'mc' in node.properties : if not opts.noopt: remove_from_mc_pool(node) logging.info("%s deleted from mc node pool" % (node.name)) sys.exit(0) for node in wnodes: if node.name in CANDIDATE_NODES and (node.state.count('offline') == 0 and ('mc' in node.properties or 'el7' in node.properties)): mcnodes.append(node) if os.path.isfile(cfg.underpop_nodes_file): pkl_file = open(cfg.underpop_nodes_file,'rb') nodes_too_few_jobs_last_run = pickle.load(pkl_file) pkl_file.close() else: nodes_too_few_jobs_last_run = list() logging.debug("nodes from pickle file, marked from last run: " + \ repr(nodes_too_few_jobs_last_run) ) draining_slots = 0 mcdedicated = list() draining_nodes = 0 for node in wnodes: if 'el7' not in node.properties and 'mc' in node.properties : mcdedicated.append(node) draining_slots += node.freeCpu if node.freeCpu > 0 : draining_nodes += 1 if opts.info: waiting, node_tuples = getmcjobinfo(mcdedicated,cfg) for t in node_tuples: if t[2] == 0: print "%28s has %2d running mc jobs, %2d empty slots" % (t[0], t[2], t[3]) else: print "%28s has %2d running mc jobs, %2d empty slots, ratio %4.1f" % (t[0], t[2], t[3], float(t[3])/t[2]) sys.exit(0) candidate_nodes = list() for n in mcnodes: if n not in mcdedicated: candidate_nodes.append(n) if opts.delall: for grabbed_node in wnodes: if 'mc' in grabbed_node.properties : logging.info("%s deleted from mc node pool" % (grabbed_node.name)) if not opts.noopt: remove_from_mc_pool(grabbed_node) sys.exit(0) if opts.addall: for grabbed_node in candidate_nodes: logging.info("%s added to mc node pool" % (grabbed_node.name)) # candidate_nodes.remove(grabbed_node) if not opts.noopt: add_to_mc_pool(grabbed_node) sys.exit(0) # check for dedicated nodes with too few jobs nodes_with_too_few_jobs = list() for node in mcdedicated: logging.debug(node.name + " has " + repr(node.freeCpu) + " free slots") if node.freeCpu > cfg.maxfreepernode: nodes_with_too_few_jobs.append(node) logging.debug("there are " + repr(len(nodes_with_too_few_jobs)) + \ " nodes with too few jobs") nodes_consistently_underpopulated = list() for n in nodes_with_too_few_jobs: if n.name in nodes_too_few_jobs_last_run: nodes_consistently_underpopulated.append(n) undcount = len(nodes_consistently_underpopulated) if undcount > 0: logging.debug("there are " + repr(undcount) + \ " nodes with too few jobs that were also marked last run" ) import random removed_a_node = False if undcount > 0: logging.info("nodes consistently underpopulated:") for n in nodes_consistently_underpopulated: logging.info(" " + n.name) if undcount > 1: remcount = undcount / 2 # number to remove logging.info("going to remove " + repr(remcount) + " from mc pool") else: remcount = 1 # find out how many running mc jobs each node has waiting, node_tuples = getmcjobinfo(nodes_consistently_underpopulated,cfg) for node_t in node_tuples[:remcount]: nname, nnode, running_mc, unused_slots = node_t logging.info("dumped %d empty slots, %d mc jobs on %s" % \ (unused_slots, running_mc, nname) ) if not opts.noopt : remove_from_mc_pool(nnode) nodes_with_too_few_jobs.remove(nnode) nodes_consistently_underpopulated.remove(nnode) removed_a_node = True namelist = list() if len(nodes_with_too_few_jobs) > 0: logging.debug("placing %d nodes " % (len(nodes_with_too_few_jobs)) + \ "with too few jobs on list for next run:") for n in nodes_with_too_few_jobs: logging.debug(n.name) namelist.append(n.name) if not opts.noopt: pkl_file = open(cfg.underpop_nodes_file,'wb') pickle.dump(namelist,pkl_file) pkl_file.close() if len(nodes_with_too_few_jobs) > 0 or removed_a_node : sys.exit(0) # if we survive to this point, all nodes 'dedicated' are being efficiently # used. are we draining less than the configured max? logging.debug("There are " + repr(len(mcdedicated)) + " dedicated nodes and " + \ repr(draining_slots) + " unused slots") if (cfg.maxfree - draining_slots) >= cfg.maxfreepernode: logging.debug("%d unused slots are permitted, so %d more" % (cfg.maxfree, cfg.maxfree - draining_slots)) logging.debug("headroom of more than %d+ slots means we can try to grab another node" % (cfg.maxfreepernode) ) if draining_nodes < cfg.maxdrain : # first check if there are actually any waiting jobs to run; if not makes no sense to grab a node. waiting, node_tuples = getmcjobinfo(mcdedicated,cfg) if not waiting: logging.debug("No waiting jobs found, nothing to do") sys.exit(0) logging.debug("max %d node(s) with unused slots permitted, %d seen" % \ (cfg.maxdrain, draining_nodes)) logging.debug("there are also waiting jobs: try to grab another node") # build a list of candidates to grab logging.debug("found %d candidate nodes to dedicate to mc" % len(candidate_nodes)) if len(candidate_nodes) < 1: logging.debug("no more nodes, bailing out, nothing more I can do") sys.exit(0) for nn in range(min(cfg.nodespergrab,len(candidate_nodes))): # logging.debug("found %d candidate nodes to dedicate to mc" % len(candidate_nodes)) grabbed_node=random.choice(candidate_nodes) logging.info("%s added to mc node pool" % (grabbed_node.name)) candidate_nodes.remove(grabbed_node) if not opts.noopt: add_to_mc_pool(grabbed_node) else: logging.debug("There are %d nodes with unused slots (draining)" % (draining_nodes)) logging.debug("This equals or exceeds the configured max of %d" % (cfg.maxdrain)) logging.debug("Doing nothing now") elif draining_slots > cfg.maxfree: # find out how many running mc jobs each node has waiting, node_tuples = getmcjobinfo(mcdedicated,cfg) slots_to_recover = draining_slots - cfg.maxfree logging.info("unused slot limit (%d) exceeded by %d " \ % (cfg.maxfree, slots_to_recover) + ": remove node(s) from mc pool") slots_recovered = 0 while slots_recovered < slots_to_recover: node_t = node_tuples.pop(0) nname, nnode, running_mc, unused_slots = node_t if unused_slots > 0: logging.info("dumped %d empty slots, %d mc jobs on %s" % \ (unused_slots, running_mc, nname) ) if not opts.noopt : remove_from_mc_pool(nnode) slots_recovered += unused_slots else: logging.debug("%d unused slots of allowed %d" % (draining_slots, cfg.maxfree)) logging.debug("difference is %d which is less than %d" % \ (cfg.maxfree - draining_slots, cfg.maxfreepernode) ) logging.debug("so: doing nothing now.")