Skip to content
Snippets Groups Projects
dist_tasks.c 32.33 KiB
/*****************************************************************************\
 *  Copyright (C) 2006-2009 Hewlett-Packard Development Company, L.P.
 *  Copyright (C) 2008-2009 Lawrence Livermore National Security.
 *  Written by Susanne M. Balle, <susanne.balle@hp.com>
 *  CODE-OCEC-09-009. All rights reserved.
 *  
 *  This file is part of SLURM, a resource management program.
 *  For details, see <https://computing.llnl.gov/linux/slurm/>.
 *  Please also read the included file: DISCLAIMER.
 *  
 *  SLURM is free software; you can redistribute it and/or modify it under
 *  the terms of the GNU General Public License as published by the Free
 *  Software Foundation; either version 2 of the License, or (at your option)
 *  any later version.
 *  
 *  In addition, as a special exception, the copyright holders give permission
 *  to link the code of portions of this program with the OpenSSL library under
 *  certain conditions as described in each individual source file, and
 *  distribute linked combinations including the two. You must obey the GNU
 *  General Public License in all respects for all of the code used other than
 *  OpenSSL. If you modify file(s) with this exception, you may extend this
 *  exception to your version of the file(s), but you are not obligated to do
 *  so. If you do not wish to do so, delete this exception statement from your
 *  version.  If you delete this exception statement from all source files in
 *  the program, then also delete it here.
 *
 *  SLURM is distributed in the hope that it will be useful, but WITHOUT ANY
 *  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
 *  FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
 *  details.
 *  
 *  You should have received a copy of the GNU General Public License along
 *  with SLURM; if not, write to the Free Software Foundation, Inc.,
 *  51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA.
\*****************************************************************************/

#include "affinity.h"
#include "dist_tasks.h"
#include "src/common/bitstring.h"
#include "src/common/log.h"
#include "src/common/slurm_cred.h"
#include "src/common/slurm_protocol_api.h"
#include "src/common/slurm_resource_info.h"
#include "src/common/xmalloc.h"
#include "src/slurmd/slurmd/slurmd.h"

#ifdef HAVE_NUMA
#include <numa.h>
#endif

static char *_alloc_mask(launch_tasks_request_msg_t *req,
			 int *whole_node_cnt, int *whole_socket_cnt, 
			 int *whole_core_cnt, int *whole_thread_cnt,		 				 int *part_socket_cnt, int *part_core_cnt);
static bitstr_t *_get_avail_map(launch_tasks_request_msg_t *req,
				uint16_t *hw_sockets, uint16_t *hw_cores,
				uint16_t *hw_threads);
static int _get_local_node_info(slurm_cred_arg_t *arg, uint32_t job_node_id,
				uint16_t *sockets, uint16_t *cores);

static int _task_layout_lllp_block(launch_tasks_request_msg_t *req,
				   uint32_t node_id, bitstr_t ***masks_p);
static int _task_layout_lllp_cyclic(launch_tasks_request_msg_t *req,
				    uint32_t node_id, bitstr_t ***masks_p);
static int _task_layout_lllp_multi(launch_tasks_request_msg_t *req, 
				    uint32_t node_id, bitstr_t ***masks_p);

static void _lllp_map_abstract_masks(const uint32_t maxtasks,
				     bitstr_t **masks);
static void _lllp_generate_cpu_bind(launch_tasks_request_msg_t *req,
				    const uint32_t maxtasks,
				    bitstr_t **masks);

/*     BLOCK_MAP     physical machine LLLP index to abstract block LLLP index
 *     BLOCK_MAP_INV physical abstract block LLLP index to machine LLLP index
 */
#define BLOCK_MAP(index)	_block_map(index, conf->block_map)
#define BLOCK_MAP_INV(index)	_block_map(index, conf->block_map_inv)


/* _block_map
 *
 * safely returns a mapped index using a provided block map
 *
 * IN - index to map
 * IN - map to use
 */
static uint16_t _block_map(uint16_t index, uint16_t *map)
{
	if (map == NULL) {
	    	return index;
	}
	/* make sure bit falls in map */
	if (index >= conf->block_map_size) {
		debug3("wrapping index %u into block_map_size of %u",
		       index, conf->block_map_size);
		index = index % conf->block_map_size;
	}
	index = map[index];
	return(index);
}

static void _task_layout_display_masks(launch_tasks_request_msg_t *req, 
					const uint32_t *gtid,
					const uint32_t maxtasks,
					bitstr_t **masks)
{
	int i;
	char *str = NULL;
	for(i = 0; i < maxtasks; i++) {
		str = (char *)bit_fmt_hexmask(masks[i]);
		debug3("_task_layout_display_masks jobid [%u:%d] %s",
		       req->job_id, gtid[i], str);
		xfree(str);
	}
}

static void _lllp_free_masks(const uint32_t maxtasks, bitstr_t **masks)
{
    	int i;
	bitstr_t *bitmask;
	for (i = 0; i < maxtasks; i++) { 
		bitmask = masks[i];
	    	if (bitmask) {
			bit_free(bitmask);
		}
	}
	xfree(masks);
}

#ifdef HAVE_NUMA
/* _match_mask_to_ldom
 *
 * expand each mask to encompass the whole locality domain
 * within which it currently exists
 * NOTE: this assumes that the masks are already in logical
 * (and not abstract) CPU order.
 */
static void _match_masks_to_ldom(const uint32_t maxtasks, bitstr_t **masks)
{
	uint32_t i, b, size;

	if (!masks || !masks[0])
		return;
	size = bit_size(masks[0]);
	for(i = 0; i < maxtasks; i++) {
		for (b = 0; b < size; b++) {
			if (bit_test(masks[i], b)) {
				/* get the NUMA node for this CPU, and then
				 * set all CPUs in the mask that exist in
				 * the same CPU */
				int c;
				uint16_t nnid = slurm_get_numa_node(b);
				for (c = 0; c < size; c++) {
					if (slurm_get_numa_node(c) == nnid)
						bit_set(masks[i], c);
				}
			}
		}
	}
}
#endif

/* 
 * batch_bind - Set the batch request message so as to bind the shell to the 
 *	proper resources
 */
void batch_bind(batch_job_launch_msg_t *req)
{
	bitstr_t *req_map, *hw_map;
	slurm_cred_arg_t arg;
	uint16_t sockets=0, cores=0, num_procs;
	int hw_size, start, p, t, task_cnt=0;
	char *str;

	if (slurm_cred_get_args(req->cred, &arg) != SLURM_SUCCESS) {
		error("task/affinity: job lacks a credential");
		return;
	}
	start = _get_local_node_info(&arg, 0, &sockets, &cores);
	if (start != 0) {
		error("task/affinity: missing node 0 in job credential");
		slurm_cred_free_args(&arg);
		return;
	}

	hw_size    = conf->sockets * conf->cores * conf->threads;
	num_procs  = MIN((sockets * cores),
			 (conf->sockets * conf->cores));
	req_map = (bitstr_t *) bit_alloc(num_procs);
	hw_map  = (bitstr_t *) bit_alloc(hw_size);
	if (!req_map || !hw_map) {
		error("task/affinity: malloc error");
		bit_free(req_map);
		bit_free(hw_map);
		slurm_cred_free_args(&arg);
	}

	/* Transfer core_bitmap data to local req_map.
	 * The MOD function handles the case where fewer processes
	 * physically exist than are configured (slurmd is out of 
	 * sync with the slurmctld daemon). */
	for (p = 0; p < (sockets * cores); p++) {
		if (bit_test(arg.core_bitmap, p))
			bit_set(req_map, (p % num_procs));
	}
	str = (char *)bit_fmt_hexmask(req_map);
	debug3("task/affinity: job %u CPU mask from slurmctld: %s",
		req->job_id, str);
	xfree(str);
	for (p = 0; p < num_procs; p++) {
		if (bit_test(req_map, p) == 0)
			continue;
		/* core_bitmap does not include threads, so we
		 * add them here but limit them to what the job
		 * requested */
		for (t = 0; t < conf->threads; t++) {
			uint16_t bit = p * conf->threads + t;
			bit_set(hw_map, bit);
			task_cnt++;
		}
	}
	if (task_cnt) {
		req->cpu_bind_type = CPU_BIND_MASK;
		if (conf->task_plugin_param & CPU_BIND_VERBOSE)
			req->cpu_bind_type |= CPU_BIND_VERBOSE;
		req->cpu_bind = (char *)bit_fmt_hexmask(hw_map);
		info("task/affinity: job %u CPU final mask for node: %s",
		     req->job_id, req->cpu_bind);
	} else {
		error("task/affinity: job %u allocated not CPUs", 
		      req->job_id);
	}
	bit_free(hw_map);
	bit_free(req_map);
	slurm_cred_free_args(&arg);
}

/* 
 * lllp_distribution
 *
 * Note: lllp stands for Lowest Level of Logical Processors. 
 *
 * When automatic binding is enabled:
 *      - no binding flags set >= CPU_BIND_NONE, and
 *      - a auto binding level selected CPU_BIND_TO_{SOCKETS,CORES,THREADS}
 * Otherwise limit job step to the allocated CPUs
 *
 * generate the appropriate cpu_bind type and string which results in
 * the specified lllp distribution.
 *
 * IN/OUT- job launch request (cpu_bind_type and cpu_bind updated)
 * IN- global task id array
 */
void lllp_distribution(launch_tasks_request_msg_t *req, uint32_t node_id)
{
	int rc = SLURM_SUCCESS;
	bitstr_t **masks = NULL;
	char buf_type[100];
	int maxtasks = req->tasks_to_launch[(int)node_id];
	int whole_nodes, whole_sockets, whole_cores, whole_threads;
	int part_sockets, part_cores;
        const uint32_t *gtid = req->global_task_ids[(int)node_id];
	uint16_t bind_entity, bind_mode;

	bind_mode = CPU_BIND_NONE   | 
		    CPU_BIND_MASK   | CPU_BIND_RANK   | CPU_BIND_MAP |
		    CPU_BIND_LDMASK | CPU_BIND_LDRANK | CPU_BIND_LDMAP;
	if (req->cpu_bind_type & bind_mode) {	/* explicit user mapping */
		char *avail_mask = _alloc_mask(req,
					       &whole_nodes,  &whole_sockets, 
					       &whole_cores,  &whole_threads,
					       &part_sockets, &part_cores);
		if ((whole_nodes == 0) && avail_mask) {
			xfree(req->cpu_bind);
			req->cpu_bind = avail_mask;
			req->cpu_bind_type &= (~bind_mode);
			req->cpu_bind_type |= CPU_BIND_MASK;
		} else
			xfree(avail_mask);
		slurm_sprint_cpu_bind_type(buf_type, req->cpu_bind_type);
		info("lllp_distribution jobid [%u] manual binding: %s",
		     req->job_id, buf_type);
		return;
	}

	bind_entity = CPU_BIND_TO_THREADS | CPU_BIND_TO_CORES |
		      CPU_BIND_TO_SOCKETS | CPU_BIND_TO_LDOMS;
	if (!(req->cpu_bind_type & bind_entity)) {
		int max_tasks = req->tasks_to_launch[(int)node_id];
		char *avail_mask = _alloc_mask(req,
					       &whole_nodes,  &whole_sockets, 
					       &whole_cores,  &whole_threads,
					       &part_sockets, &part_cores);
		debug("binding tasks:%d to "
		      "nodes:%d sockets:%d:%d cores:%d:%d threads:%d",
		      max_tasks, whole_nodes, whole_sockets ,part_sockets,
		      whole_cores, part_cores, whole_threads);
		if ((max_tasks == whole_sockets) && (part_sockets == 0)) {
			req->cpu_bind_type |= CPU_BIND_TO_SOCKETS;
			goto make_auto;
		}
		if ((max_tasks == whole_cores) && (part_cores == 0)) {
			req->cpu_bind_type |= CPU_BIND_TO_CORES;
			goto make_auto;
		}
		if (max_tasks == whole_threads) {
			req->cpu_bind_type |= CPU_BIND_TO_THREADS;
			goto make_auto;
		}
		if (avail_mask) {
			xfree(req->cpu_bind);
			req->cpu_bind = avail_mask;
			req->cpu_bind_type |= CPU_BIND_MASK;
		}
		slurm_sprint_cpu_bind_type(buf_type, req->cpu_bind_type);
		info("lllp_distribution jobid [%u] auto binding off: %s",
		     req->job_id, buf_type);
		return;

  make_auto:	xfree(avail_mask);
		slurm_sprint_cpu_bind_type(buf_type, req->cpu_bind_type);
		info("lllp_distribution jobid [%u] implicit auto binding: "
		     "%s, dist %d", req->job_id, buf_type, req->task_dist);
	} else {
		slurm_sprint_cpu_bind_type(buf_type, req->cpu_bind_type);
		info("lllp_distribution jobid [%u] auto binding: %s, dist %d",
		     req->job_id, buf_type, req->task_dist);
	}

	switch (req->task_dist) {
	case SLURM_DIST_BLOCK_BLOCK:
	case SLURM_DIST_CYCLIC_BLOCK:
	case SLURM_DIST_PLANE:
		/* tasks are distributed in blocks within a plane */
		rc = _task_layout_lllp_block(req, node_id, &masks);
		break;
	case SLURM_DIST_CYCLIC:
	case SLURM_DIST_BLOCK:
	case SLURM_DIST_CYCLIC_CYCLIC:
	case SLURM_DIST_BLOCK_CYCLIC:
		rc = _task_layout_lllp_cyclic(req, node_id, &masks); 
		break;
	default:
		if (req->cpus_per_task > 1)
			rc = _task_layout_lllp_multi(req, node_id, &masks);
		else
			rc = _task_layout_lllp_cyclic(req, node_id, &masks);
		req->task_dist = SLURM_DIST_BLOCK_CYCLIC;
		break;
	}


	/* FIXME: I'm worried about core_bitmap with CPU_BIND_TO_SOCKETS &
	 * max_cores - does select/cons_res plugin allocate whole
	 * socket??? Maybe not. Check srun man page.
	 */



	if (rc == SLURM_SUCCESS) {
		_task_layout_display_masks(req, gtid, maxtasks, masks); 
	    	/* translate abstract masks to actual hardware layout */
		_lllp_map_abstract_masks(maxtasks, masks);
		_task_layout_display_masks(req, gtid, maxtasks, masks); 
#ifdef HAVE_NUMA
		if (req->cpu_bind_type & CPU_BIND_TO_LDOMS) {
			_match_masks_to_ldom(maxtasks, masks);
			_task_layout_display_masks(req, gtid, maxtasks, masks);
		}
#endif
	    	 /* convert masks into cpu_bind mask string */
		 _lllp_generate_cpu_bind(req, maxtasks, masks);
	} else {
		char *avail_mask = _alloc_mask(req,
					       &whole_nodes,  &whole_sockets,
					       &whole_cores,  &whole_threads,
					       &part_sockets, &part_cores);
		if (avail_mask) {
			xfree(req->cpu_bind);
			req->cpu_bind = avail_mask;
			req->cpu_bind_type &= (~bind_mode);
			req->cpu_bind_type |= CPU_BIND_MASK;
		}
		slurm_sprint_cpu_bind_type(buf_type, req->cpu_bind_type);
		error("lllp_distribution jobid [%u] overriding binding: %s",
		      req->job_id, buf_type);
		error("Verify socket/core/thread counts in configuration");
	}
	_lllp_free_masks(maxtasks, masks);
}


/*
 * _get_local_node_info - get job allocation details for this node
 * IN: req         - launch request structure
 * IN: job_node_id - index of the local node in the job allocation
 * IN/OUT: sockets - pointer to socket count variable
 * IN/OUT: cores   - pointer to cores_per_socket count variable
 * OUT:  returns the core_bitmap index of the first core for this node
 */
static int _get_local_node_info(slurm_cred_arg_t *arg, uint32_t job_node_id,
				uint16_t *sockets, uint16_t *cores)
{
	int bit_start = 0, bit_finish = 0;
	int i, index = -1, cur_node_id = -1;

	do {
		index++;
		for (i = 0; i < arg->sock_core_rep_count[index] &&
			    cur_node_id < job_node_id; i++) {
			bit_start = bit_finish;
			bit_finish += arg->sockets_per_node[index] *
					arg->cores_per_socket[index];
			cur_node_id++;
		}
		
	} while (cur_node_id < job_node_id);

	*sockets = arg->sockets_per_node[index];
	*cores   = arg->cores_per_socket[index];
	return bit_start;
}

/* enforce max_sockets, max_cores */
static void _enforce_limits(launch_tasks_request_msg_t *req, bitstr_t *mask,
			    uint16_t hw_sockets, uint16_t hw_cores,
			    uint16_t hw_threads)
{
	uint16_t i, j, size, count = 0;
	int prev = -1;

	size = bit_size(mask);
	/* enforce max_sockets */
	for (i = 0; i < size; i++) {
		if (bit_test(mask, i) == 0)
			continue;
		/* j = first bit in socket; i = last bit in socket */
		j = i/(hw_cores * hw_threads) * (hw_cores * hw_threads);
		i = j+(hw_cores * hw_threads)-1;
		if (++count > req->max_sockets) {
			bit_nclear(mask, j, i);
			count--;
		}
	}
	/* enforce max_cores */
	for (i = 0; i < size; i++) {
		if (bit_test(mask, i) == 0)
			continue;
		/* j = first bit in socket */
		j = i/(hw_cores * hw_threads) * (hw_cores * hw_threads);
		if (j != prev) {
			/* we're in a new socket, so reset the count */
			count = 0;
			prev = j;
		}
		/* j = first bit in core; i = last bit in core */
		j = i/hw_threads * hw_threads;
		i = j+hw_threads-1;
		if (++count > req->max_cores) {
			bit_nclear(mask, j, i);
			count--;
		}
	}
}

/* Determine which CPUs a job step can use. 
 * OUT whole_<entity>_count - returns count of whole <entities> in this 
 *                            allocation for this node
 * OUT part__<entity>_count - returns count of partial <entities> in this 
 *                            allocation for this node
 * RET - a string representation of the available mask or NULL on errlr
 * NOTE: Caller must xfree() the return value. */
static char *_alloc_mask(launch_tasks_request_msg_t *req,
			 int *whole_node_cnt,  int *whole_socket_cnt, 
			 int *whole_core_cnt,  int *whole_thread_cnt,
			 int *part_socket_cnt, int *part_core_cnt)
{
	uint16_t sockets, cores, threads;
	int c, s, t, i, mask;
	int c_miss, s_miss, t_miss, c_hit, t_hit;
	bitstr_t *alloc_bitmap;
	char *str_mask;

	*whole_node_cnt   = 0;
	*whole_socket_cnt = 0;
	*whole_core_cnt   = 0;
	*whole_thread_cnt = 0;
	*part_socket_cnt  = 0;
	*part_core_cnt    = 0;

	alloc_bitmap = _get_avail_map(req, &sockets, &cores, &threads);
	if (!alloc_bitmap)
		return NULL;

	i = mask = 0;
	for (s=0, s_miss=false; s<sockets; s++) {
		for (c=0, c_hit=c_miss=false; c<cores; c++) {
			for (t=0, t_hit=t_miss=false; t<threads; t++) {
				if (bit_test(alloc_bitmap, i)) {
					mask |= (1 << i);
					(*whole_thread_cnt)++;
					t_hit = true;
					c_hit = true;
				} else
					t_miss = true;
				i++;
			}
			if (!t_miss)
				(*whole_core_cnt)++;
			else {
				if (t_hit)
					(*part_core_cnt)++;
				c_miss = true;
			}
		}
		if (!c_miss)
			(*whole_socket_cnt)++;
		else {
			if (c_hit)
				(*part_socket_cnt)++;
			s_miss = true;
		}
	}
	if (!s_miss)
		(*whole_node_cnt)++;
	bit_free(alloc_bitmap);

	str_mask = xmalloc(16);
	snprintf(str_mask, 16, "%x", mask);
	return str_mask;
}

static bitstr_t *_get_avail_map(launch_tasks_request_msg_t *req,
				uint16_t *hw_sockets, uint16_t *hw_cores,
				uint16_t *hw_threads)
{
	bitstr_t *req_map, *hw_map;
	slurm_cred_arg_t arg;
	uint16_t p, t, num_procs, num_threads, sockets, cores, hw_size;
	uint32_t job_node_id;
	int start;
	char *str;

	*hw_sockets = conf->sockets;
	*hw_cores   = conf->cores;
	*hw_threads = conf->threads;
	hw_size    = (*hw_sockets) * (*hw_cores) * (*hw_threads);

	if (slurm_cred_get_args(req->cred, &arg) != SLURM_SUCCESS) {
		error("task/affinity: job lacks a credential");
		return NULL;
	}

	/* we need this node's ID in relation to the whole
	 * job allocation, not just this jobstep */
	job_node_id = nodelist_find(arg.job_hostlist, conf->node_name);
	start = _get_local_node_info(&arg, job_node_id, &sockets, &cores);
	if (start < 0) {
		error("task/affinity: missing node %u in job credential",
		      job_node_id);
		slurm_cred_free_args(&arg);
		return NULL;
	}
	debug3("task/affinity: slurmctld s %u c %u; hw s %u c %u t %u",
		sockets, cores, *hw_sockets, *hw_cores, *hw_threads);

	num_procs   = MIN((sockets * cores),
			  ((*hw_sockets)*(*hw_cores)));
	req_map = (bitstr_t *) bit_alloc(num_procs);
	hw_map  = (bitstr_t *) bit_alloc(hw_size);
	if (!req_map || !hw_map) {
		error("task/affinity: malloc error");
		bit_free(req_map);
		bit_free(hw_map);
		slurm_cred_free_args(&arg);
		return NULL;
	}
	/* Transfer core_bitmap data to local req_map.
	 * The MOD function handles the case where fewer processes
	 * physically exist than are configured (slurmd is out of 
	 * sync with the slurmctld daemon). */
	for (p = 0; p < (sockets * cores); p++) {
		if (bit_test(arg.core_bitmap, start+p))
			bit_set(req_map, (p % num_procs));
	}

	str = (char *)bit_fmt_hexmask(req_map);
	debug3("task/affinity: job %u.%u CPU mask from slurmctld: %s",
		req->job_id, req->job_step_id, str);
	xfree(str);

	num_threads = MIN(req->max_threads, (*hw_threads));
	for (p = 0; p < num_procs; p++) {
		if (bit_test(req_map, p) == 0)
			continue;
		/* core_bitmap does not include threads, so we
		 * add them here but limit them to what the job
		 * requested */
		for (t = 0; t < num_threads; t++) {
			uint16_t bit = p * (*hw_threads) + t;
			bit_set(hw_map, bit);
		}
	}

	/* enforce max_sockets and max_cores limits */
	_enforce_limits(req, hw_map, *hw_sockets, *hw_cores, *hw_threads);
	
	str = (char *)bit_fmt_hexmask(hw_map);
	debug3("task/affinity: job %u.%u CPU final mask for local node: %s",
		req->job_id, req->job_step_id, str);
	xfree(str);

	bit_free(req_map);
	slurm_cred_free_args(&arg);
	return hw_map;
}

/* helper function for _expand_masks() */
static void _blot_mask(bitstr_t *mask, uint16_t blot)
{
	uint16_t i, size = 0;
	int prev = -1;

	if (!mask)
		return;
	size = bit_size(mask);
	for (i = 0; i < size; i++) {
		if (bit_test(mask, i)) {
			/* fill in this blot */
			uint16_t start = (i / blot) * blot;
			if (start != prev) {
				bit_nset(mask, start, start+blot-1);
				prev = start;
			}
		}
	}
}

/* foreach mask, expand the mask around the set bits to include the
 * complete resource to which the set bits are to be bound */
static void _expand_masks(uint16_t cpu_bind_type, const uint32_t maxtasks,
			  bitstr_t **masks, uint16_t hw_sockets,
			  uint16_t hw_cores, uint16_t hw_threads)
{
	uint32_t i;

	if (cpu_bind_type & CPU_BIND_TO_THREADS)
		return;
	if (cpu_bind_type & CPU_BIND_TO_CORES) {
		if (hw_threads < 2)
			return;
		for (i = 0; i < maxtasks; i++) {
			_blot_mask(masks[i], hw_threads);
		}
		return;
	}
	if (cpu_bind_type & CPU_BIND_TO_SOCKETS) {
		if (hw_threads*hw_cores < 2)
			return;
		for (i = 0; i < maxtasks; i++) {
			_blot_mask(masks[i], hw_threads*hw_cores);
		}
		return;
	}
}

/* 
 * _task_layout_lllp_multi
 *
 * A variant of _task_layout_lllp_cyclic for use with allocations having 
 * more than one CPU per task, put the tasks as close as possible (fill 
 * core rather than going next socket for the extra task)
 *
 */
static int _task_layout_lllp_multi(launch_tasks_request_msg_t *req, 
				    uint32_t node_id, bitstr_t ***masks_p)
{
	int last_taskcount = -1, taskcount = 0;
	uint16_t c, i, s, t, hw_sockets = 0, hw_cores = 0, hw_threads = 0;
	uint16_t num_threads, num_cores, num_sockets;
	int size, max_tasks = req->tasks_to_launch[(int)node_id];
	int max_cpus = max_tasks * req->cpus_per_task;
	bitstr_t *avail_map;
	bitstr_t **masks = NULL;
	
	info ("_task_layout_lllp_multi ");

	avail_map = _get_avail_map(req, &hw_sockets, &hw_cores, &hw_threads);
	if (!avail_map)
		return SLURM_ERROR;
	
	*masks_p = xmalloc(max_tasks * sizeof(bitstr_t*));
	masks = *masks_p;
	
	size = bit_set_count(avail_map);
	if (size < max_tasks) {
		error("task/affinity: only %d bits in avail_map for %d tasks!",
		      size, max_tasks);
		bit_free(avail_map);
		return SLURM_ERROR;
	}
	if (size < max_cpus) {
		/* Possible result of overcommit */
		i = size / max_tasks;
		info("task/affinity: reset cpus_per_task from %d to %d",
		     req->cpus_per_task, i);
		req->cpus_per_task = i;
	}
	
	size = bit_size(avail_map);
	num_sockets = MIN(req->max_sockets, hw_sockets);
	num_cores   = MIN(req->max_cores, hw_cores);
	num_threads = MIN(req->max_threads, hw_threads);
	i = 0;
	while (taskcount < max_tasks) {
		if (taskcount == last_taskcount)
			fatal("_task_layout_lllp_multi failure");
		last_taskcount = taskcount; 
		for (s = 0; s < hw_sockets; s++) {
			for (c = 0; c < hw_cores; c++) {
				for (t = 0; t < num_threads; t++) {
					uint16_t bit = s*(hw_cores*hw_threads) +
							c*(hw_threads) + t;
					if (bit_test(avail_map, bit) == 0)
						continue;
					if (masks[taskcount] == NULL)
						masks[taskcount] =
						    (bitstr_t *)bit_alloc(size);
					bit_set(masks[taskcount], bit);
					if (++i < req->cpus_per_task)
						continue;
					i = 0;
					if (++taskcount >= max_tasks)
						break;
				}
				if (taskcount >= max_tasks)
					break;
			}
			if (taskcount >= max_tasks)
				break;
		}
	}
	bit_free(avail_map);
	
	/* last step: expand the masks to bind each task
	 * to the requested resource */
	_expand_masks(req->cpu_bind_type, max_tasks, masks,
			hw_sockets, hw_cores, hw_threads);

	return SLURM_SUCCESS;
}

/* 
 * _task_layout_lllp_cyclic
 *
 * task_layout_lllp_cyclic creates a cyclic distribution at the
 * lowest level of logical processor which is either socket, core or
 * thread depending on the system architecture. The Cyclic algorithm
 * is the same as the the Cyclic distribution performed in srun.
 *
 *  Distribution at the lllp: 
 *  -m hostfile|plane|block|cyclic:block|cyclic 
 * 
 * The first distribution "hostfile|plane|block|cyclic" is computed
 * in srun. The second distribution "plane|block|cyclic" is computed
 * locally by each slurmd.
 *  
 * The input to the lllp distribution algorithms is the gids (tasks
 * ids) generated for the local node.
 *  
 * The output is a mapping of the gids onto logical processors
 * (thread/core/socket) with is expressed cpu_bind masks.
 *
 */
static int _task_layout_lllp_cyclic(launch_tasks_request_msg_t *req, 
				    uint32_t node_id, bitstr_t ***masks_p)
{
	int last_taskcount = -1, taskcount = 0;
	uint16_t c, i, s, t, hw_sockets = 0, hw_cores = 0, hw_threads = 0;
	uint16_t num_threads, num_cores, num_sockets;
	int size, max_tasks = req->tasks_to_launch[(int)node_id];
	int max_cpus = max_tasks * req->cpus_per_task;
	bitstr_t *avail_map;
	bitstr_t **masks = NULL;
	
	info ("_task_layout_lllp_cyclic ");

	avail_map = _get_avail_map(req, &hw_sockets, &hw_cores, &hw_threads);
	if (!avail_map)
		return SLURM_ERROR;
	
	*masks_p = xmalloc(max_tasks * sizeof(bitstr_t*));
	masks = *masks_p;
	
	size = bit_set_count(avail_map);
	if (size < max_tasks) {
		error("task/affinity: only %d bits in avail_map for %d tasks!",
		      size, max_tasks);
		bit_free(avail_map);
		return SLURM_ERROR;
	}
	if (size < max_cpus) {
		/* Possible result of overcommit */
		i = size / max_tasks;
		info("task/affinity: reset cpus_per_task from %d to %d",
		     req->cpus_per_task, i);
		req->cpus_per_task = i;
	}

	size = bit_size(avail_map);
	num_sockets = MIN(req->max_sockets, hw_sockets);
	num_cores   = MIN(req->max_cores, hw_cores);
	num_threads = MIN(req->max_threads, hw_threads);
	i = 0;
	while (taskcount < max_tasks) {
		if (taskcount == last_taskcount)
			fatal("_task_layout_lllp_cyclic failure");
		last_taskcount = taskcount; 
		for (t = 0; t < num_threads; t++) {
			for (c = 0; c < hw_cores; c++) {
				for (s = 0; s < hw_sockets; s++) {
					uint16_t bit = s*(hw_cores*hw_threads) +
							c*(hw_threads) + t;
					if (bit_test(avail_map, bit) == 0)
						continue;
					if (masks[taskcount] == NULL)
						masks[taskcount] =
						    (bitstr_t *)bit_alloc(size);
					bit_set(masks[taskcount], bit);
					if (++i < req->cpus_per_task)
						continue;
					i = 0;
					if (++taskcount >= max_tasks)
						break;
				}
				if (taskcount >= max_tasks)
					break;
			}
			if (taskcount >= max_tasks)
				break;
		}
	}
	bit_free(avail_map);
	
	/* last step: expand the masks to bind each task
	 * to the requested resource */
	_expand_masks(req->cpu_bind_type, max_tasks, masks,
			hw_sockets, hw_cores, hw_threads);

	return SLURM_SUCCESS;
}

/* 
 * _task_layout_lllp_block
 *
 * task_layout_lllp_block will create a block distribution at the
 * lowest level of logical processor which is either socket, core or
 * thread depending on the system architecture. The Block algorithm
 * is the same as the the Block distribution performed in srun.
 *
 *  Distribution at the lllp: 
 *  -m hostfile|plane|block|cyclic:block|cyclic 
 * 
 * The first distribution "hostfile|plane|block|cyclic" is computed
 * in srun. The second distribution "plane|block|cyclic" is computed
 * locally by each slurmd.
 *  
 * The input to the lllp distribution algorithms is the gids (tasks
 * ids) generated for the local node.
 *  
 * The output is a mapping of the gids onto logical processors
 * (thread/core/socket)  with is expressed cpu_bind masks.
 *
 */
static int _task_layout_lllp_block(launch_tasks_request_msg_t *req, 
				   uint32_t node_id, bitstr_t ***masks_p)
{
	int c, i, j, t, size, last_taskcount = -1, taskcount = 0;
	uint16_t hw_sockets = 0, hw_cores = 0, hw_threads = 0;
	uint16_t num_sockets, num_cores, num_threads;
	int max_tasks = req->tasks_to_launch[(int)node_id];
	int max_cpus = max_tasks * req->cpus_per_task;
	int *task_array;
	bitstr_t *avail_map;
	bitstr_t **masks = NULL;

	info("_task_layout_lllp_block ");

	avail_map = _get_avail_map(req, &hw_sockets, &hw_cores, &hw_threads);
	if (!avail_map) {
		return SLURM_ERROR;
	}

	size = bit_set_count(avail_map);
	if (size < max_tasks) {
		error("task/affinity: only %d bits in avail_map for %d tasks!",
		      size, max_tasks);
		bit_free(avail_map);
		return SLURM_ERROR;
	}
	if (size < max_cpus) {
		/* Possible result of overcommit */
		i = size / max_tasks;
		info("task/affinity: reset cpus_per_task from %d to %d",
		     req->cpus_per_task, i);
		req->cpus_per_task = i;
	}
	size = bit_size(avail_map);
	*masks_p = xmalloc(max_tasks * sizeof(bitstr_t*));
	masks = *masks_p;

	task_array = xmalloc(size * sizeof(int));
	if (!task_array) {
		error("In lllp_block: task_array memory error");
		bit_free(avail_map);
		return SLURM_ERROR;
	}
	
	/* block distribution with oversubsciption */
	num_sockets = MIN(req->max_sockets, hw_sockets);
	num_cores   = MIN(req->max_cores, hw_cores);
	num_threads = MIN(req->max_threads, hw_threads);
	c = 0;
	while(taskcount < max_tasks) {
		if (taskcount == last_taskcount) {
			fatal("_task_layout_lllp_block infinite loop");
		}
		last_taskcount = taskcount;
		/* the abstract map is already laid out in block order,
		 * so just iterate over it
		 */
		for (i = 0; i < size; i++) {
			/* skip unrequested threads */
			if (i%hw_threads >= num_threads)
				continue;
			/* skip unavailable resources */
			if (bit_test(avail_map, i) == 0)
				continue;
			/* if multiple CPUs per task, only
			 * count the task on the first CPU */
			if (c == 0)
				task_array[i] += 1;
			if (++c < req->cpus_per_task)
				continue;
			c = 0;
			if (++taskcount >= max_tasks)
				break;
		}
	}
	/* Distribute the tasks and create per-task masks that only
	 * contain the first CPU. Note that unused resources
	 * (task_array[i] == 0) will get skipped */
	taskcount = 0;
	for (i = 0; i < size; i++) {
		for (t = 0; t < task_array[i]; t++) {
			if (masks[taskcount] == NULL)
				masks[taskcount] = (bitstr_t *)bit_alloc(size);
			bit_set(masks[taskcount++], i);
		}
	}
	/* now set additional CPUs for cpus_per_task > 1 */
	for (t=0; t<max_tasks && req->cpus_per_task>1; t++) {
		if (!masks[t])
			continue;
		for (i = 0; i < size; i++) {
			if (bit_test(masks[t], i) == 0)
				continue;
			for (j=i+1,c=1; j<size && c<req->cpus_per_task;j++) {
				if (bit_test(avail_map, j) == 0)
					continue;
				bit_set(masks[t], j);
				c++;
			}
			if (c < req->cpus_per_task) {
				/* we haven't found all of the CPUs for this
				 * task, so we'll wrap the search to cover the
				 * whole node */
				for (j=0; j<i && c<req->cpus_per_task; j++) {
					if (bit_test(avail_map, j) == 0)
						continue;
					bit_set(masks[t], j);
					c++;
				}
			}
		}
	}

	xfree(task_array);
	bit_free(avail_map);

	/* last step: expand the masks to bind each task
	 * to the requested resource */
	_expand_masks(req->cpu_bind_type, max_tasks, masks,
			hw_sockets, hw_cores, hw_threads);

	return SLURM_SUCCESS;
}

/*
 * _lllp_map_abstract_mask
 *
 * Map one abstract block mask to a physical machine mask
 *
 * IN - mask to map
 * OUT - mapped mask (storage allocated in this routine)
 */
static bitstr_t *_lllp_map_abstract_mask(bitstr_t *bitmask)
{
    	int i, bit;
	int num_bits = bit_size(bitmask);
	bitstr_t *newmask = NULL;
	newmask = (bitstr_t *) bit_alloc(num_bits);

	/* remap to physical machine */
	for (i = 0; i < num_bits; i++) {
		if (bit_test(bitmask,i)) {
			bit = BLOCK_MAP(i);
			bit_set(newmask, bit);
		}
	}
	return newmask;
}

/*
 * _lllp_map_abstract_masks
 *
 * Map an array of abstract block masks to physical machine masks
 *
 * IN- maximum number of tasks
 * IN/OUT- array of masks
 */
static void _lllp_map_abstract_masks(const uint32_t maxtasks, bitstr_t **masks)
{
    	int i;
	debug3("_lllp_map_abstract_masks");
	
	for (i = 0; i < maxtasks; i++) { 
		bitstr_t *bitmask = masks[i];
	    	if (bitmask) {
			bitstr_t *newmask = _lllp_map_abstract_mask(bitmask);
			bit_free(bitmask);
			masks[i] = newmask;
		}
	}
}

/* 
 * _lllp_generate_cpu_bind
 *
 * Generate the cpu_bind type and string given an array of bitstr_t masks
 *
 * IN/OUT- job launch request (cpu_bind_type and cpu_bind updated)
 * IN- maximum number of tasks
 * IN- array of masks
 */
static void _lllp_generate_cpu_bind(launch_tasks_request_msg_t *req,
				    const uint32_t maxtasks, bitstr_t **masks)
{
    	int i, num_bits=0, masks_len;
	bitstr_t *bitmask;
	bitoff_t charsize;
	char *masks_str = NULL;
	char buf_type[100];

	for (i = 0; i < maxtasks; i++) { 
		bitmask = masks[i];
	    	if (bitmask) {
			num_bits = bit_size(bitmask);
			break;
		}
	}
	charsize = (num_bits + 3) / 4;		/* ASCII hex digits */
	charsize += 3;				/* "0x" and trailing "," */
	masks_len = maxtasks * charsize + 1;	/* number of masks + null */

	debug3("_lllp_generate_cpu_bind %d %d %d", maxtasks, charsize,
		masks_len);

	masks_str = xmalloc(masks_len);
	masks_len = 0;
	for (i = 0; i < maxtasks; i++) {
	    	char *str;
		int curlen;
		bitmask = masks[i];
	    	if (bitmask == NULL) {
			continue;
		}
		str = (char *)bit_fmt_hexmask(bitmask);
		curlen = strlen(str) + 1;

		if (masks_len > 0)
			masks_str[masks_len-1]=',';
		strncpy(&masks_str[masks_len], str, curlen);
		masks_len += curlen;
		xassert(masks_str[masks_len] == '\0');
		xfree(str);
	}

	if (req->cpu_bind) {
	    	xfree(req->cpu_bind);
	}
	if (masks_str[0] != '\0') {
		req->cpu_bind = masks_str;
		req->cpu_bind_type |= CPU_BIND_MASK; 
	} else {
		req->cpu_bind = NULL;
		req->cpu_bind_type &= ~CPU_BIND_VERBOSE;
	}

	/* clear mask generation bits */
	req->cpu_bind_type &= ~CPU_BIND_TO_THREADS;
	req->cpu_bind_type &= ~CPU_BIND_TO_CORES;
	req->cpu_bind_type &= ~CPU_BIND_TO_SOCKETS;
	req->cpu_bind_type &= ~CPU_BIND_TO_LDOMS;

	slurm_sprint_cpu_bind_type(buf_type, req->cpu_bind_type);
	info("_lllp_generate_cpu_bind jobid [%u]: %s, %s",
	     req->job_id, buf_type, masks_str);
}