Skip to content
Snippets Groups Projects
srun.c 28.8 KiB
Newer Older
/*****************************************************************************\
 *  srun.c - user interface to allocate resources, submit jobs, and execute 
 *	parallel jobs.
 *****************************************************************************
 *  Copyright (C) 2002-2006 The Regents of the University of California.
 *  Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
 *  Written by Mark Grondona <grondona@llnl.gov>, et. al.
 *  
 *  This file is part of SLURM, a resource management program.
 *  For details, see <http://www.llnl.gov/linux/slurm/>.
 *  
 *  SLURM is free software; you can redistribute it and/or modify it under
 *  the terms of the GNU General Public License as published by the Free
 *  Software Foundation; either version 2 of the License, or (at your option)
 *  any later version.
 *
 *  In addition, as a special exception, the copyright holders give permission 
 *  to link the code of portions of this program with the OpenSSL library under
 *  certain conditions as described in each individual source file, and 
 *  distribute linked combinations including the two. You must obey the GNU 
 *  General Public License in all respects for all of the code used other than 
 *  OpenSSL. If you modify file(s) with this exception, you may extend this 
 *  exception to your version of the file(s), but you are not obligated to do 
 *  so. If you do not wish to do so, delete this exception statement from your
 *  version.  If you delete this exception statement from all source files in 
 *  the program, then also delete it here.
 *  
 *  SLURM is distributed in the hope that it will be useful, but WITHOUT ANY
 *  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
 *  FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
 *  details.
 *  
 *  You should have received a copy of the GNU General Public License along
 *  with SLURM; if not, write to the Free Software Foundation, Inc.,
 *  51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA.
\*****************************************************************************/
Mark Grondona's avatar
Mark Grondona committed

#ifdef HAVE_CONFIG_H
#  include "config.h"
Mark Grondona's avatar
Mark Grondona committed
#endif

#ifdef WITH_PTHREADS
Mark Grondona's avatar
Mark Grondona committed
#  include <pthread.h>
#endif

#ifdef HAVE_AIX
#  undef HAVE_UNSETENV
#ifndef HAVE_UNSETENV
#  include "src/common/unsetenv.h"
#endif

#include <sys/resource.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <ctype.h>
#include <fcntl.h>
#include <pwd.h>
Mark Grondona's avatar
Mark Grondona committed
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
Mark Grondona's avatar
Mark Grondona committed
#include <signal.h>
#include <unistd.h>
Mark Grondona's avatar
Mark Grondona committed

#include "src/common/fd.h"
#include "src/common/log.h"
#include "src/common/slurm_protocol_api.h"
#include "src/common/switch.h"
#include "src/common/xmalloc.h"
#include "src/common/xsignal.h"
#include "src/common/xstring.h"
Danny Auble's avatar
Danny Auble committed
#include "src/common/net.h"
#include "src/common/mpi.h"
#include "src/common/slurm_rlimits_info.h"
#include "src/common/plugstack.h"
#include "src/common/read_config.h"
Danny Auble's avatar
Danny Auble committed
#include "src/srun/srun_job.h"
#include "src/srun/srun_pty.h"
#include "src/srun/multi_prog.h"
#include "src/api/pmi_server.h"
Moe Jette's avatar
Moe Jette committed
#include "src/api/step_launch.h"
#define	TYPE_NOT_TEXT	0
#define	TYPE_TEXT	1
#define	TYPE_SCRIPT	2

mpi_plugin_client_info_t mpi_job_info[1];
static struct termios termdefaults;
int global_rc;
srun_job_t *job = NULL;

struct {
	bitstr_t *start_success;
	bitstr_t *start_failure;
	bitstr_t *finish_normal;
	bitstr_t *finish_abnormal;
} task_state;
/*
 * forward declaration of static funcs
 */
static void  _print_job_information(resource_allocation_response_msg_t *resp);
static void  _set_prio_process_env(void);
static int   _set_rlimit_env(void);
static char *_uint16_array_to_str(int count, const uint16_t *array);
static void  _run_srun_prolog (srun_job_t *job);
static void  _run_srun_epilog (srun_job_t *job);
static int   _run_srun_script (srun_job_t *job, char *script);
static int   _slurm_debug_env_val (void);
static int   _call_spank_local_user (srun_job_t *job);
static void  _set_stdio_fds(srun_job_t *job, slurm_step_io_fds_t *cio_fds);
static void  _define_symbols(void);
static void  _pty_restore(void);
static void  _step_opt_exclusive(void);
static void _task_start(launch_tasks_response_msg_t *msg);
static void _task_finish(task_exit_msg_t *msg);
static void _task_state_struct_init(int num_tasks);
static void _task_state_struct_print(void);
static void _task_state_struct_free(void);
static void _handle_intr();
static void _handle_signal(int signo);
static int _setup_signals();
Mark Grondona's avatar
Mark Grondona committed
{
	resource_allocation_response_msg_t *resp;
	env_t *env = xmalloc(sizeof(env_t));
	log_options_t logopt = LOG_OPTS_STDERR_ONLY;
	slurm_step_launch_params_t launch_params;
	slurm_step_launch_callbacks_t callbacks;
	int got_alloc = 0;

	env->stepid = -1;
	env->procid = -1;
	env->nodeid = -1;
	env->cli = NULL;
	env->env = NULL;
	debug_level = _slurm_debug_env_val();
	logopt.stderr_level += debug_level;
	log_init(xbasename(av[0]), logopt, 0, NULL);
/* 	xsignal(SIGQUIT, _ignore_signal); */
/* 	xsignal(SIGPIPE, _ignore_signal); */
/* 	xsignal(SIGUSR1, _ignore_signal); */
/* 	xsignal(SIGUSR2, _ignore_signal); */

	/* Initialize plugin stack, read options from plugins, etc.
	 */
	if (spank_init(NULL) < 0) {
		fatal("Plug-in initialization failed");

	/* Be sure to call spank_fini when srun exits.
	 */
	if (atexit((void (*) (void)) spank_fini) < 0)
		error("Failed to register atexit handler for plugins: %m");
Mark Grondona's avatar
Mark Grondona committed
	/* set default options, process commandline arguments, and
	 * verify some basic values
	 */
	if (initialize_and_process_args(ac, av) < 0) {
		error ("srun initialization failed");
		exit (1);
	}
	record_ppid();
	/* reinit log with new verbosity (if changed by command line)
		/* If log level is already increased, only increment the
		 *   level to the difference of _verbose an LOG_LEVEL_INFO
		 */
		if ((_verbose -= (logopt.stderr_level - LOG_LEVEL_INFO)) > 0)
			logopt.stderr_level += _verbose;
		logopt.stderr_level -= opt.quiet;
		logopt.prefix_level = 1;
		log_alter(logopt, 0, NULL);
	(void) _set_rlimit_env();
	_set_prio_process_env();
	(void) _set_umask_env();
	/* Set up slurmctld message handler */
	slurmctld_msg_init();
	/* now global "opt" should be filled in and available,
Mark Grondona's avatar
Mark Grondona committed
	 * create a job from opt
	if (opt.test_only) {
		int rc = allocate_test();
		if (rc) {
			slurm_perror("allocation failure");
	} else if (opt.no_alloc) {
		info("do not allocate resources");
		job = job_create_noalloc(); 
	} else if ((resp = existing_allocation())) {
		job_id = resp->job_id;
		if (opt.alloc_nodelist == NULL)
                       opt.alloc_nodelist = xstrdup(resp->node_list);
		if (opt.exclusive)
			_step_opt_exclusive();
		job = job_step_create_allocation(resp);
		slurm_free_resource_allocation_response_msg(resp);

		if (!job || create_job_step(job) < 0)
Mark Grondona's avatar
Mark Grondona committed
	} else {
		/* Combined job allocation and job step launch */
#ifdef HAVE_FRONT_END
		uid_t my_uid = getuid();
		if ((my_uid != 0)
		&&  (my_uid != slurm_get_slurm_user_id())) {
			error("srun task launch not supported on this system");
			exit(1);
		}
#endif
		if ( !(resp = allocate_nodes()) ) 
			exit(1);
		_print_job_information(resp);
Danny Auble's avatar
Danny Auble committed
		job = job_create_allocation(resp);
		opt.exclusive = false;	/* not applicable for this step */
		if (!job || create_job_step(job) < 0) {
			slurm_complete_job(job->jobid, 1);
		slurm_free_resource_allocation_response_msg(resp);
	/*
	 *  Become --uid user
	 */
	if (_become_user () < 0)
		info ("Warning: Unable to assume uid=%lu\n", opt.uid);

	env->nprocs = opt.nprocs;
	env->cpus_per_task = opt.cpus_per_task;
	if (opt.ntasks_per_node != NO_VAL)
		env->ntasks_per_node = opt.ntasks_per_node;
	if (opt.ntasks_per_socket != NO_VAL)
		env->ntasks_per_socket = opt.ntasks_per_socket;
	if (opt.ntasks_per_core != NO_VAL)
		env->ntasks_per_core = opt.ntasks_per_core;
	env->distribution = opt.distribution;
	if (opt.plane_size != NO_VAL)
		env->plane_size = opt.plane_size;
	env->cpu_bind_type = opt.cpu_bind_type;
	env->cpu_bind = opt.cpu_bind;
	env->mem_bind_type = opt.mem_bind_type;
	env->mem_bind = opt.mem_bind;
	env->overcommit = opt.overcommit;
	env->slurmd_debug = opt.slurmd_debug;
	env->labelio = opt.labelio;
	env->comm_port = slurmctld_comm_addr.port;
	env->comm_hostname = slurmctld_comm_addr.hostname;
		uint16_t *tasks = NULL;
		slurm_step_ctx_get(job->step_ctx, SLURM_STEP_CTX_TASKS, 
				   &tasks);

		env->select_jobinfo = job->select_jobinfo;
		env->nhosts = job->nhosts;
		env->nodelist = job->nodelist;
		env->task_count = _uint16_array_to_str(
		env->jobid = job->jobid;
		env->stepid = job->stepid;
Moe Jette's avatar
Moe Jette committed
	if (opt.pty) {
		struct termios term;
		int fd = STDIN_FILENO;

		/* Save terminal settings for restore */
		tcgetattr(fd, &termdefaults); 
		tcgetattr(fd, &term);
		/* Set raw mode on local tty */
		cfmakeraw(&term);
		tcsetattr(fd, TCSANOW, &term);
		atexit(&_pty_restore);
Moe Jette's avatar
Moe Jette committed
		set_winsize(job);
		block_sigwinch();
		pty_thread_create(job);
		env->pty_port = job->pty_port;
		env->ws_col   = job->ws_col;
		env->ws_row   = job->ws_row;
	}
	_task_state_struct_init(opt.nprocs);
	slurm_step_launch_params_t_init(&launch_params);
	launch_params.gid = opt.gid;
	launch_params.argc = opt.argc;
	launch_params.argv = opt.argv;
	launch_params.multi_prog = opt.multi_prog ? true : false;
	launch_params.cwd = opt.cwd;
	launch_params.slurmd_debug = opt.slurmd_debug;
	launch_params.buffered_stdio = !opt.unbuffered;
	launch_params.labelio = opt.labelio ? true : false;
	launch_params.remote_output_filename =fname_remote_string(job->ofname);
	launch_params.remote_input_filename = fname_remote_string(job->ifname);
	launch_params.remote_error_filename = fname_remote_string(job->efname);
	launch_params.task_prolog = opt.task_prolog;
	launch_params.task_epilog = opt.task_epilog;
	launch_params.cpu_bind = opt.cpu_bind;
	launch_params.cpu_bind_type = opt.cpu_bind_type;
	launch_params.mem_bind = opt.mem_bind;
	launch_params.mem_bind_type = opt.mem_bind_type;	
	launch_params.open_mode = opt.open_mode;
	if (opt.acctg_freq >= 0)
		launch_params.acctg_freq = opt.acctg_freq;
	launch_params.pty = opt.pty;
	launch_params.max_sockets     = opt.max_sockets_per_node;
	launch_params.max_cores       = opt.max_cores_per_socket;
	launch_params.max_threads     = opt.max_threads_per_core;
	launch_params.cpus_per_task = opt.cpus_per_task;
	launch_params.ntasks_per_node   = opt.ntasks_per_node;
	launch_params.ntasks_per_socket = opt.ntasks_per_socket;
	launch_params.ntasks_per_core   = opt.ntasks_per_core;
	launch_params.ckpt_path = xstrdup(opt.ckpt_path);
	/* job structure should now be filled in */
	_setup_signals();
	_set_stdio_fds(job, &launch_params.local_fds);
	if (MPIR_being_debugged) {
		launch_params.parallel_debug = true;
		pmi_server_max_threads(1);
	} else {
		launch_params.parallel_debug = false;
	}
	callbacks.task_start = _task_start;
	callbacks.task_finish = _task_finish;
	mpir_init(job->ctx_params.task_count);
	if (_call_spank_local_user (job) < 0) {
		error("Failure in local plugin stack");
		slurm_step_launch_abort(job->step_ctx);
	update_job_state(job, SRUN_JOB_LAUNCHING);
	if (slurm_step_launch(job->step_ctx, slurmctld_comm_addr.hostname, 
	    &launch_params, &callbacks) != SLURM_SUCCESS) {
		error("Application launch failed: %m");
		goto cleanup;
	}
	update_job_state(job, SRUN_JOB_STARTING);
	if (slurm_step_launch_wait_start(job->step_ctx) == SLURM_SUCCESS) {
		update_job_state(job, SRUN_JOB_RUNNING);
		/* Only set up MPIR structures if the step launched
		   correctly. */
		if (opt.multi_prog)
			mpir_set_multi_name(job->ctx_params.task_count,
					    launch_params.argv[0]);
		else
			mpir_set_executable_names(launch_params.argv[0]);
		MPIR_debug_state = MPIR_DEBUG_SPAWNED;
		MPIR_Breakpoint();
		if (opt.debugger_test)
			mpir_dump_proctable();
	} else {
		info("Job step aborted before step completely launched.");
	}
	slurm_step_launch_wait_finish(job->step_ctx);
		slurm_complete_job(job->jobid, global_rc);
	_run_srun_epilog(job);
	slurm_step_ctx_destroy(job->step_ctx);
	mpir_cleanup();
	_task_state_struct_free();
Mark Grondona's avatar
Mark Grondona committed
}
static int _call_spank_local_user (srun_job_t *job)
{
	struct spank_launcher_job_info info[1];
	job_step_create_response_msg_t *step_resp;

	info->uid = opt.uid;
	info->gid = opt.gid;
	info->jobid = job->jobid;
	info->stepid = job->stepid;
	slurm_step_ctx_get(job->step_ctx, SLURM_STEP_CTX_RESP, &step_resp);
	info->step_layout = step_resp->step_layout;
	info->argc = opt.argc;
	info->argv = opt.argv;
static int _slurm_debug_env_val (void)
{
	long int level = 0;
	const char *val;

	if ((val = getenv ("SLURM_DEBUG"))) {
		char *p;
		if ((level = strtol (val, &p, 10)) < -LOG_LEVEL_INFO)
			level = -LOG_LEVEL_INFO;
		if (p && *p != '\0')
			level = 0;
	}
	return ((int) level);
}

/*
 * Return a string representation of an array of uint32_t elements.
 * Each value in the array is printed in decimal notation and elements
 * are seperated by a comma.  If sequential elements in the array
 * contain the same value, the value is written out just once followed
 * by "(xN)", where "N" is the number of times the value is repeated.
 *
 * Example:
 *   The array "1, 2, 1, 1, 1, 3, 2" becomes the string "1,2,1(x3),3,2"
 *
 * Returns an xmalloc'ed string.  Free with xfree().
 */
static char *_uint16_array_to_str(int array_len, const uint16_t *array)
	int i;
	int previous = 0;
	char *sep = ",";  /* seperator */
	char *str = xstrdup("");

	if(array == NULL)
		return str;

	for (i = 0; i < array_len; i++) {
		if ((i+1 < array_len)
		    && (array[i] == array[i+1])) {
				previous++;
				continue;

		if (i == array_len-1) /* last time through loop */
			sep = "";
		if (previous > 0) {
			xstrfmtcat(str, "%u(x%u)%s",
				   array[i], previous+1, sep);
		} else {
			xstrfmtcat(str, "%u%s", array[i], sep);
		}
		previous = 0;
Mark Grondona's avatar
Mark Grondona committed
static void 
_print_job_information(resource_allocation_response_msg_t *resp)
Mark Grondona's avatar
Mark Grondona committed
{
	int i;
	xstrfmtcat(str, "jobid %u: nodes(%u):`%s', cpu counts: ",
		   resp->job_id, resp->node_cnt, resp->node_list);

	for (i = 0; i < resp->num_cpu_groups; i++) {
		xstrfmtcat(str, "%s%u(x%u)",
			   sep, resp->cpus_per_node[i],
		           resp->cpu_count_reps[i]);
		sep = ",";
/* Set SLURM_UMASK environment variable with current state */
static int _set_umask_env(void)
{
	mode_t mask;

	if (getenv("SLURM_UMASK"))	/* use this value */
		return SLURM_SUCCESS;

	mask = (int)umask(0);
	sprintf(mask_char, "0%d%d%d", 
		((mask>>6)&07), ((mask>>3)&07), mask&07);
	if (setenvf(NULL, "SLURM_UMASK", "%s", mask_char) < 0) {
		error ("unable to set SLURM_UMASK in environment");
		return SLURM_FAILURE;
	}
	debug ("propagating UMASK=%s", mask_char); 
/*
 * _set_prio_process_env
 *
 * Set the internal SLURM_PRIO_PROCESS environment variable to support
 * the propagation of the users nice value and the "PropagatePrioProcess"
 * config keyword.
 */
static void  _set_prio_process_env(void)
{
	int retval;

	errno = 0; /* needed to detect a real failure since prio can be -1 */

	if ((retval = getpriority (PRIO_PROCESS, 0)) == -1)  {
		if (errno) {
			error ("getpriority(PRIO_PROCESS): %m");
			return;
		}
	}

	if (setenvf (NULL, "SLURM_PRIO_PROCESS", "%d", retval) < 0) {
		error ("unable to set SLURM_PRIO_PROCESS in environment");
		return;
	}

	debug ("propagating SLURM_PRIO_PROCESS=%d", retval);
}

/* Set SLURM_RLIMIT_* environment variables with current resource 
 * limit values, reset RLIMIT_NOFILE to maximum possible value */
static int _set_rlimit_env(void)
{
	int                  rc = SLURM_SUCCESS;
	struct rlimit        rlim[1];
	unsigned long        cur;
	char                 name[64], *format;
	slurm_rlimits_info_t *rli;
	for (rli = get_slurm_rlimits_info(); rli->name != NULL; rli++ ) {
		if (getrlimit (rli->resource, rlim) < 0) {
			error ("getrlimit (RLIMIT_%s): %m", rli->name);
		cur = (unsigned long) rlim->rlim_cur;
		snprintf(name, sizeof(name), "SLURM_RLIMIT_%s", rli->name);
		if (opt.propagate && rli->propagate_flag == PROPAGATE_RLIMITS)
			/*
			 * Prepend 'U' to indicate user requested propagate
			 */
			format = "U%lu";
		else
			format = "%lu";
		if (setenvf (NULL, name, format, cur) < 0) {
			error ("unable to set %s in environment", name);
		debug ("propagating RLIMIT_%s=%lu", rli->name, cur);
	}

	/* 
	 *  Now increase NOFILE to the max available for this srun
	 */
	if (getrlimit (RLIMIT_NOFILE, rlim) < 0)
	 	return (error ("getrlimit (RLIMIT_NOFILE): %m"));

	if (rlim->rlim_cur < rlim->rlim_max) {
		rlim->rlim_cur = rlim->rlim_max;
		if (setrlimit (RLIMIT_NOFILE, rlim) < 0) 
			return (error ("Unable to increase max no. files: %m"));
static int _become_user (void)
{
	struct passwd *pwd = getpwuid (opt.uid);

	if (opt.uid == getuid ())
		return (0);

	if ((opt.egid != (gid_t) -1) && (setgid (opt.egid) < 0))
		return (error ("setgid: %m"));

	initgroups (pwd->pw_name, pwd->pw_gid); /* Ignore errors */

	if (setuid (opt.uid) < 0)
		return (error ("setuid: %m"));

	return (0);
}

static void _run_srun_prolog (srun_job_t *job)
{
	int rc;

	if (opt.prolog && strcasecmp(opt.prolog, "none") != 0) {
		rc = _run_srun_script(job, opt.prolog);
		debug("srun prolog rc = %d", rc);
	}
}

static void _run_srun_epilog (srun_job_t *job)
{
	int rc;

	if (opt.epilog && strcasecmp(opt.epilog, "none") != 0) {
		rc = _run_srun_script(job, opt.epilog);
		debug("srun epilog rc = %d", rc);
	}
}

static int _run_srun_script (srun_job_t *job, char *script)
{
	int status;
	pid_t cpid;
	int i;
	char **args = NULL;

	if (script == NULL || script[0] == '\0')
		return 0;

	if (access(script, R_OK | X_OK) < 0) {
		info("Access denied for %s: %m", script);
		return 0;
	}

	if ((cpid = fork()) < 0) {
		error ("run_srun_script: fork: %m");
		return -1;
	}
	if (cpid == 0) {

		/* set the scripts command line arguments to the arguments
		 * for the application, but shifted one higher
		 */
		args = xmalloc(sizeof(char *) * 1024);
		args[0] = script;
		for (i = 0; i < opt.argc; i++) {
			args[i+1] = opt.argv[i];
		}
		args[i+1] = NULL;
		execv(script, args);
		error("help! %m");
		exit(127);
	}

	do {
		if (waitpid(cpid, &status, 0) < 0) {
			if (errno == EINTR)
				continue;
			error("waidpid: %m");
			return 0;
		} else
			return status;
	} while(1);

	/* NOTREACHED */
}
{
	if (fname->name == NULL)
		return 1;
	
	if (fname->taskid != -1)
		return 1;

	return ((fname->type != IO_PER_TASK) && (fname->type != IO_ONE));
}

static void
_set_stdio_fds(srun_job_t *job, slurm_step_io_fds_t *cio_fds)
{
	bool err_shares_out = false;
	int file_flags;

	if (opt.open_mode == OPEN_MODE_APPEND)
		file_flags = O_CREAT|O_WRONLY|O_APPEND;
	else if (opt.open_mode == OPEN_MODE_TRUNCATE)
		file_flags = O_CREAT|O_WRONLY|O_APPEND|O_TRUNC;
	else {
		slurm_ctl_conf_t *conf;
		conf = slurm_conf_lock();
		if (conf->job_file_append)
			file_flags = O_CREAT|O_WRONLY|O_APPEND;
		else
			file_flags = O_CREAT|O_WRONLY|O_APPEND|O_TRUNC;
		slurm_conf_unlock();
	}

	/*
	 * create stdin file descriptor
	 */
	if (_is_local_file(job->ifname)) {
		if ((job->ifname->name == NULL) || (job->ifname->taskid != -1)) {
			cio_fds->in.fd = STDIN_FILENO;
		} else {
			cio_fds->in.fd = open(job->ifname->name, O_RDONLY);
			if (cio_fds->in.fd == -1)
				fatal("Could not open stdin file: %m");
		}
		if (job->ifname->type == IO_ONE) {
			job_step_create_response_msg_t *step_resp = NULL;
			
			slurm_step_ctx_get(job->step_ctx, SLURM_STEP_CTX_RESP,
					   &step_resp);
		
			cio_fds->in.taskid = job->ifname->taskid;
			cio_fds->in.nodeid = slurm_step_layout_host_id(
				step_resp->step_layout, job->ifname->taskid);
		}
	}

	/*
	 * create stdout file descriptor
	 */
	if (_is_local_file(job->ofname)) {
		if ((job->ofname->name == NULL) || (job->ofname->taskid != -1)) {
			cio_fds->out.fd = STDOUT_FILENO;
		} else {
			cio_fds->out.fd = open(job->ofname->name,
			if (cio_fds->out.fd == -1)
				fatal("Could not open stdout file: %m");
		}
		if (job->ofname->name != NULL
		    && job->efname->name != NULL
		    && !strcmp(job->ofname->name, job->efname->name)) {
			err_shares_out = true;
		}
	}

	/*
	 * create seperate stderr file descriptor only if stderr is not sharing
	 * the stdout file descriptor
	 */
	if (err_shares_out) {
		debug3("stdout and stderr sharing a file");
		cio_fds->err.fd = cio_fds->out.fd;
		cio_fds->err.taskid = cio_fds->out.taskid;
	} else if (_is_local_file(job->efname)) {
		if ((job->efname->name == NULL) || (job->efname->taskid != -1)) {
			cio_fds->err.fd = STDERR_FILENO;
		} else {
			cio_fds->err.fd = open(job->efname->name,
			if (cio_fds->err.fd == -1)
				fatal("Could not open stderr file: %m");
		}
	}
}

/* Plugins must be able to resolve symbols.
 * Since srun statically links with src/api/libslurmhelper rather than 
 * dynamicaly linking with libslurm, we need to reference all needed 
 * symbols within srun. None of the functions below are actually 
 * used, but we need to load the symbols. */
static void _define_symbols(void)
{
	slurm_signal_job_step(0,0,0);	/* needed by mvapich and mpichgm */
}

static void _pty_restore(void)
{
	/* STDIN is probably closed by now */
	if (tcsetattr(STDOUT_FILENO, TCSANOW, &termdefaults) < 0)
		fprintf(stderr, "tcsetattr: %s\n", strerror(errno));
}

/* opt.exclusive is set, disable user task layout controls */
static void _step_opt_exclusive(void)
{
	if (opt.nodes_set) {
		verbose("ignoring node count set by --nodes or SLURM_NNODES");
		verbose("  it is incompatible with --exclusive");
		opt.nodes_set = false;
		opt.min_nodes = 1;
		opt.max_nodes = 0;
	}
	if (!opt.nprocs_set)
		fatal("--nprocs must be set with --exclusive");
	if (opt.relative_set)
		fatal("--relative disabled, incompatible with --exclusive");
	if (opt.exc_nodes)
		fatal("--exclude is incompatible with --exclusive");
	if (opt.nodelist)
		fatal("--nodelist is incompatible with --exclusive");
}

static void
_task_start(launch_tasks_response_msg_t *msg)
{
	MPIR_PROCDESC *table;
	int taskid;
	int i;

	verbose("Node %s (%d), %d tasks started",
		msg->node_name, msg->srun_node_id, msg->count_of_pids);

	for (i = 0; i < msg->count_of_pids; i++) {
		taskid = msg->task_ids[i];
		table = &MPIR_proctable[taskid];
		table->host_name = xstrdup(msg->node_name);
		/* table->executable_name is set elsewhere */
		table->pid = msg->local_pids[i];

		if (msg->return_code == 0) {
			bit_set(task_state.start_success, taskid);
		} else {
			bit_set(task_state.start_failure, taskid);
		}
	}

}

static void
_terminate_job_step(slurm_step_ctx_t *step_ctx)
{
	uint32_t job_id, step_id;

	slurm_step_ctx_get(step_ctx, SLURM_STEP_CTX_JOBID, &job_id);
	slurm_step_ctx_get(step_ctx, SLURM_STEP_CTX_STEPID, &step_id);
	info("Terminating job step %u.%u", job_id, step_id);
	slurm_kill_job_step(job_id, step_id, SIGKILL);
}

static void
_handle_max_wait(int signo)
{
	info("First task exited %ds ago", opt.max_wait);
	_task_state_struct_print();
	_terminate_job_step(job->step_ctx);
}

static void
_task_finish(task_exit_msg_t *msg)
{
	static bool first_done = true;
	static bool first_error = true;
	int rc = 0;
	int i;

	verbose("%d tasks finished (rc=%u)",
		msg->num_tasks, msg->return_code);
	if (WIFEXITED(msg->return_code)) {
		rc = WEXITSTATUS(msg->return_code);
		if (rc != 0) {
			for (i = 0; i < msg->num_tasks; i++) {
				error("task %u exited with exit code %d",
				      msg->task_id_list[i], rc);
				bit_set(task_state.finish_abnormal,
					msg->task_id_list[i]);
			}
		} else {
			for (i = 0; i < msg->num_tasks; i++) {
				bit_set(task_state.finish_normal,
					msg->task_id_list[i]);
			}
		}
	} else if (WIFSIGNALED(msg->return_code)) {
		for (i = 0; i < msg->num_tasks; i++) {
			verbose("task %u killed by signal %d",
				msg->task_id_list[i],
				WTERMSIG(msg->return_code));
			bit_set(task_state.finish_abnormal,
				msg->task_id_list[i]);
		}
		rc = 1;
	}
	global_rc = MAX(global_rc, rc);

	if (first_error && rc > 0 && opt.kill_bad_exit) {
		first_error = false;
		_terminate_job_step(job->step_ctx);
	} else if (first_done && opt.max_wait > 0) {
		/* If these are the first tasks to finish we need to
		 * start a timer to kill off the job step if the other
		 * tasks don't finish within opt.max_wait seconds.
		 */
		first_done = false;
		debug2("First task has exited");
		xsignal(SIGALRM, _handle_max_wait);
		verbose("starting alarm of %d seconds", opt.max_wait);
		alarm(opt.max_wait);
	}
}

static void
_task_state_struct_init(int num_tasks)
{
	task_state.start_success = bit_alloc(num_tasks);
	task_state.start_failure = bit_alloc(num_tasks);
	task_state.finish_normal = bit_alloc(num_tasks);
	task_state.finish_abnormal = bit_alloc(num_tasks);
}

/*
 * Tasks will most likely have bits set in multiple of the task_state
 * bit strings (e.g. a task can start normally and then later exit normally)
 * so we ensure that a task is only "seen" once.
 */
static void
_task_state_struct_print(void)
{
	bitstr_t *tmp, *seen, *not_seen;
	char buf[BUFSIZ];
	int len;

	len = bit_size(task_state.finish_abnormal); /* all the same length */
	tmp = bit_alloc(len);
	seen = bit_alloc(len);
	not_seen = bit_alloc(len);
	bit_not(not_seen);

	if (bit_set_count(task_state.finish_abnormal) > 0) {
		bit_copybits(tmp, task_state.finish_abnormal);
		bit_and(tmp, not_seen);
		bit_fmt(buf, BUFSIZ, tmp);
		info("task%s: exited abnormally", buf);
		bit_or(seen, tmp);
		bit_copybits(not_seen, seen);
		bit_not(not_seen);
	}

	if (bit_set_count(task_state.finish_normal) > 0) {
		bit_copybits(tmp, task_state.finish_normal);
		bit_and(tmp, not_seen);
		bit_fmt(buf, BUFSIZ, tmp);
		info("task%s: exited", buf);
		bit_or(seen, tmp);
		bit_copybits(not_seen, seen);
		bit_not(not_seen);
	}

	if (bit_set_count(task_state.start_failure) > 0) {
		bit_copybits(tmp, task_state.start_failure);
		bit_and(tmp, not_seen);
		bit_fmt(buf, BUFSIZ, tmp);
		info("task%s: failed to start", buf);
		bit_or(seen, tmp);
		bit_copybits(not_seen, seen);
		bit_not(not_seen);
	}