Skip to content
Snippets Groups Projects
salloc.c 28.27 KiB
/*****************************************************************************\
 *  salloc.c - Request a SLURM job allocation and
 *             launch a user-specified command.
 *****************************************************************************
 *  Copyright (C) 2006-2007 The Regents of the University of California.
 *  Copyright (C) 2008-2010 Lawrence Livermore National Security.
 *  Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
 *  Written by Christopher J. Morrone <morrone2@llnl.gov>
 *  CODE-OCEC-09-009. All rights reserved.
 *
 *  This file is part of SLURM, a resource management program.
 *  For details, see <https://computing.llnl.gov/linux/slurm/>.
 *  Please also read the included file: DISCLAIMER.
 *
 *  SLURM is free software; you can redistribute it and/or modify it under
 *  the terms of the GNU General Public License as published by the Free
 *  Software Foundation; either version 2 of the License, or (at your option)
 *  any later version.
 *
 *  In addition, as a special exception, the copyright holders give permission
 *  to link the code of portions of this program with the OpenSSL library under
 *  certain conditions as described in each individual source file, and
 *  distribute linked combinations including the two. You must obey the GNU
 *  General Public License in all respects for all of the code used other than
 *  OpenSSL. If you modify file(s) with this exception, you may extend this
 *  exception to your version of the file(s), but you are not obligated to do
 *  so. If you do not wish to do so, delete this exception statement from your
 *  version.  If you delete this exception statement from all source files in
 *  the program, then also delete it here.
 *
 *  SLURM is distributed in the hope that it will be useful, but WITHOUT ANY
 *  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
 *  FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
 *  details.
 *
 *  You should have received a copy of the GNU General Public License along
 *  with SLURM; if not, write to the Free Software Foundation, Inc.,
 *  51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA.
\*****************************************************************************/

#if HAVE_CONFIG_H
#  include "config.h"
#endif

#include <pwd.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/param.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <termios.h>
#include <time.h>
#include <unistd.h>

#include <slurm/slurm.h>

#include "src/common/basil_resv_conf.h"
#include "src/common/env.h"
#include "src/common/read_config.h"
#include "src/common/slurm_rlimits_info.h"
#include "src/common/uid.h"
#include "src/common/xmalloc.h"
#include "src/common/xsignal.h"
#include "src/common/xstring.h"
#include "src/common/plugstack.h"

#include "src/salloc/salloc.h"
#include "src/salloc/opt.h"
#ifdef HAVE_BG
#include "src/common/node_select.h"
#include "src/plugins/select/bluegene/plugin/bg_boot_time.h"
#include "src/plugins/select/bluegene/wrap_rm_api.h"
#endif

#ifdef HAVE_CRAY
#include "src/common/node_select.h"
#endif

#ifndef __USE_XOPEN_EXTENDED
extern pid_t getsid(pid_t pid);		/* missing from <unistd.h> */
#endif

#define MAX_RETRIES	10
#define POLL_SLEEP	3	/* retry interval in seconds  */

char **command_argv;
int command_argc;
pid_t command_pid = -1;
char *work_dir = NULL;

enum possible_allocation_states allocation_state = NOT_GRANTED;
pthread_mutex_t allocation_state_lock = PTHREAD_MUTEX_INITIALIZER;

static bool exit_flag = false;
static bool allocation_interrupted = false;
static uint32_t pending_job_id = 0;
static time_t last_timeout = 0;
static struct termios saved_tty_attributes;

static void _exit_on_signal(int signo);
static int  _fill_job_desc_from_opts(job_desc_msg_t *desc);
static pid_t  _fork_command(char **command);
static void _forward_signal(int signo);
static void _job_complete_handler(srun_job_complete_msg_t *msg);
static void _node_fail_handler(srun_node_fail_msg_t *msg);
static void _pending_callback(uint32_t job_id);
static void _ping_handler(srun_ping_msg_t *msg);
static void _ring_terminal_bell(void);
static void _set_exit_code(void);
static void _set_rlimits(char **env);
static void _set_spank_env(void);
static void _set_submit_dir_env(void);
static void _signal_while_allocating(int signo);
static void _timeout_handler(srun_timeout_msg_t *msg);
static void _user_msg_handler(srun_user_msg_t *msg);

#ifdef HAVE_BG
static int _wait_bluegene_block_ready(
			resource_allocation_response_msg_t *alloc);
static int _blocks_dealloc(void);
#else
static int _wait_nodes_ready(resource_allocation_response_msg_t *alloc);
#endif

#ifdef HAVE_CRAY
static int  _claim_reservation(resource_allocation_response_msg_t *alloc);
#endif

bool salloc_shutdown = false;
/* Signals that are considered terminal before resource allocation. */
int sig_array[] = {
	SIGHUP, SIGINT, SIGQUIT, SIGPIPE,
	SIGTERM, SIGUSR1, SIGUSR2, 0
};

static void _reset_input_mode (void)
{
	/* SIGTTOU needs to be blocked per the POSIX spec:
	 * http://pubs.opengroup.org/onlinepubs/009695399/functions/tcsetattr.html
	 */
	int sig_block[] = { SIGTTOU, SIGTTIN, 0 };
	xsignal_block (sig_block);
	tcsetattr (STDIN_FILENO, TCSANOW, &saved_tty_attributes);
}

int main(int argc, char *argv[])
{
	log_options_t logopt = LOG_OPTS_STDERR_ONLY;
	job_desc_msg_t desc;
	resource_allocation_response_msg_t *alloc;
	time_t before, after;
	allocation_msg_thread_t *msg_thr;
	char **env = NULL;
	int status = 0, is_interactive;
	int retries = 0;
	pid_t pid = 0;
	pid_t rc_pid = 0;
	int i, rc = 0;
	static char *msg = "Slurm job queue full, sleeping and retrying.";
	slurm_allocation_callbacks_t callbacks;

	is_interactive = isatty(STDIN_FILENO);
	if (is_interactive) {
		bool sent_msg = false;
		/* Wait as long as we are running in the background */
		while (tcgetpgrp(STDIN_FILENO) != (pid = getpgrp())) {
			if (!sent_msg) {
				error("Waiting for %s to be placed in the "
				      "foreground", argv[0]);
				sent_msg = true;
			}
			killpg(pid, SIGTTIN);
		}

		/*
		 * Save tty attributes and reset at exit, in case a child
		 * process died before properly resetting terminal.
		 */
		tcgetattr (STDIN_FILENO, &saved_tty_attributes);
		atexit (_reset_input_mode);
	}

	log_init(xbasename(argv[0]), logopt, 0, NULL);
	_set_exit_code();

	if (spank_init_allocator() < 0) {
		error("Failed to initialize plugin stack");
		exit(error_exit);
	}

	/* Be sure to call spank_fini when salloc exits
	 */
	if (atexit((void (*) (void)) spank_fini) < 0)
		error("Failed to register atexit handler for plugins: %m");


	if (initialize_and_process_args(argc, argv) < 0) {
		error("salloc parameter parsing");
		exit(error_exit);
	}
	/* reinit log with new verbosity (if changed by command line) */
	if (opt.verbose || opt.quiet) {
		logopt.stderr_level += opt.verbose;
		logopt.stderr_level -= opt.quiet;
		logopt.prefix_level = 1;
		log_alter(logopt, 0, NULL);
	}
	if (spank_init_post_opt() < 0) {
		error("Plugin stack post-option processing failed");
		exit(error_exit);
	}

	_set_spank_env();
	_set_submit_dir_env();
	if (opt.cwd && chdir(opt.cwd)) {
		error("chdir(%s): %m", opt.cwd);
		exit(error_exit);
	}

	if (opt.get_user_env_time >= 0) {
		char *user = uid_to_string(opt.uid);
		if (strcmp(user, "nobody") == 0) {
			error("Invalid user id %u: %m", (uint32_t)opt.uid);
			exit(error_exit);
		}
		env = env_array_user_default(user,
					     opt.get_user_env_time,
					     opt.get_user_env_mode);
		xfree(user);
		if (env == NULL)
			exit(error_exit);    /* error already logged */
		_set_rlimits(env);
	}

	/*
	 * Request a job allocation
	 */
	slurm_init_job_desc_msg(&desc);
	if (_fill_job_desc_from_opts(&desc) == -1) {
		exit(error_exit);
	}
	if (opt.gid != (gid_t) -1) {
		if (setgid(opt.gid) < 0) {
			error("setgid: %m");
			exit(error_exit);
		}
	}

	callbacks.ping = _ping_handler;
	callbacks.timeout = _timeout_handler;
	callbacks.job_complete = _job_complete_handler;
	callbacks.user_msg = _user_msg_handler;
	callbacks.node_fail = _node_fail_handler;
	/* create message thread to handle pings and such from slurmctld */
	msg_thr = slurm_allocation_msg_thr_create(&desc.other_port,
						  &callbacks);

	/* NOTE: Do not process signals in separate pthread. The signal will
	 * cause slurm_allocate_resources_blocking() to exit immediately. */
	for (i = 0; sig_array[i]; i++)
		xsignal(sig_array[i], _signal_while_allocating);

	before = time(NULL);
	while ((alloc = slurm_allocate_resources_blocking(&desc, opt.immediate,
					_pending_callback)) == NULL) {
		if ((errno != ESLURM_ERROR_ON_DESC_TO_RECORD_COPY) ||
		    (retries >= MAX_RETRIES))
			break;
		if (retries == 0)
			error("%s", msg);
		else
			debug("%s", msg);
		sleep (++retries);
	}

	/* become the user after the allocation has been requested. */
	if (opt.uid != (uid_t) -1) {
		if (setuid(opt.uid) < 0) {
			error("setuid: %m");
			exit(error_exit);
		}
	}
	if (alloc == NULL) {
		if (allocation_interrupted) {
			/* cancelled by signal */
		} else if (errno == EINTR) {
			error("Interrupted by signal."
			      "  Allocation request rescinded.");
		} else if (opt.immediate &&
			   ((errno == ETIMEDOUT) ||
			    (errno == ESLURM_NOT_TOP_PRIORITY) ||
			    (errno == ESLURM_NODES_BUSY))) {
			error("Unable to allocate resources: %m");
			error_exit = immediate_exit;
		} else {
			error("Failed to allocate resources: %m");
		}
		slurm_allocation_msg_thr_destroy(msg_thr);
		exit(error_exit);
	} else if (!allocation_interrupted) {
		/*
		 * Allocation granted!
		 */
		info("Granted job allocation %u", alloc->job_id);
		pending_job_id = alloc->job_id;
#ifdef HAVE_BG
		if (!_wait_bluegene_block_ready(alloc)) {
			if(!allocation_interrupted)
				error("Something is wrong with the "
				      "boot of the block.");
			goto relinquish;
		}
#else
		if (!_wait_nodes_ready(alloc)) {
			if(!allocation_interrupted)
				error("Something is wrong with the "
				      "boot of the nodes.");
			goto relinquish;
		}	
#endif
#ifdef HAVE_CRAY
		if (!_claim_reservation(alloc)) {
			if(!allocation_interrupted)
				error("Something is wrong with the ALPS "
				      "resource reservation.");
			goto relinquish;
		}
#endif
	}

	after = time(NULL);
	if (opt.bell == BELL_ALWAYS
	    || (opt.bell == BELL_AFTER_DELAY
		&& ((after - before) > DEFAULT_BELL_DELAY))) {
		_ring_terminal_bell();
	}
	if (opt.no_shell)
		exit(0);
	if (allocation_interrupted) {
		/* salloc process received a signal after
		 * slurm_allocate_resources_blocking returned with the
		 * allocation, but before the new signal handlers were
		 * registered.
		 */
		goto relinquish;
	}
	/*
	 * Run the user's command.
	 */
	if (env_array_for_job(&env, alloc, &desc) != SLURM_SUCCESS)
		goto relinquish;

	/* Add default task count for srun, if not already set */
	if (opt.ntasks_set) {
		env_array_append_fmt(&env, "SLURM_NTASKS", "%d", opt.ntasks);
		/* keep around for old scripts */
		env_array_append_fmt(&env, "SLURM_NPROCS", "%d", opt.ntasks);
	}
	if (opt.cpus_per_task > 1) {
		env_array_append_fmt(&env, "SLURM_CPUS_PER_TASK", "%d",
				     opt.cpus_per_task);
	}
	if (opt.overcommit) {
		env_array_append_fmt(&env, "SLURM_OVERCOMMIT", "%d",
			opt.overcommit);
	}
	if (opt.acctg_freq >= 0) {
		env_array_append_fmt(&env, "SLURM_ACCTG_FREQ", "%d",
			opt.acctg_freq);
	}
	if (opt.network)
		env_array_append_fmt(&env, "SLURM_NETWORK", "%s", opt.network);

	env_array_set_environment(env);
	env_array_free(env);
	pthread_mutex_lock(&allocation_state_lock);
	if (allocation_state == REVOKED) {
		error("Allocation was revoked for job %u before command could "
		      "be run", alloc->job_id);
		pthread_mutex_unlock(&allocation_state_lock);
		if (slurm_complete_job(alloc->job_id, status) != 0) {
			error("Unable to clean up allocation for job %u: %m",
			      alloc->job_id);
		}
		return 1;
 	}
	allocation_state = GRANTED;
	pthread_mutex_unlock(&allocation_state_lock);

	/*  Ensure that salloc has initial terminal foreground control.  */
	if (is_interactive) {
		/*
		 * Ignore remaining job-control signals (other than those in
		 * sig_array, which at this state act like SIG_IGN).
		 */
		xsignal(SIGTSTP, SIG_IGN);
		xsignal(SIGTTIN, SIG_IGN);
		xsignal(SIGTTOU, SIG_IGN);

		pid = getpid();
		setpgid(pid, pid);

		tcsetpgrp(STDIN_FILENO, pid);
	}

	command_pid = _fork_command(command_argv);
	/*
	 * Wait for command to exit, OR for waitpid to be interrupted by a
	 * signal.  Either way, we are going to release the allocation next.
	 */
	if (command_pid > 0) {
		setpgid(command_pid, command_pid);
		if (is_interactive)
			tcsetpgrp(STDIN_FILENO, command_pid);

		/* NOTE: Do not process signals in separate pthread.
		 * The signal will cause waitpid() to exit immediately. */
		xsignal(SIGHUP,  _exit_on_signal);

		/* Use WUNTRACED to treat stopped children like terminated ones */
		do {
			rc_pid = waitpid(command_pid, &status, WUNTRACED);
		} while ((rc_pid == -1) && (!exit_flag));

		if ((rc_pid == -1) && (errno != EINTR))
			error("waitpid for %s failed: %m", command_argv[0]);
	}

	if (is_interactive)
		tcsetpgrp(STDIN_FILENO, pid);

	/*
	 * Relinquish the job allocation (if not already revoked).
	 */
relinquish:
	pthread_mutex_lock(&allocation_state_lock);
	if (allocation_state != REVOKED) {
		pthread_mutex_unlock(&allocation_state_lock);

		info("Relinquishing job allocation %d", alloc->job_id);
		if ((slurm_complete_job(alloc->job_id, status) != 0) &&
		    (slurm_get_errno() != ESLURM_ALREADY_DONE))
			error("Unable to clean up job allocation %d: %m",
			      alloc->job_id);
		pthread_mutex_lock(&allocation_state_lock);
		allocation_state = REVOKED;
	}
	pthread_mutex_unlock(&allocation_state_lock);

	slurm_free_resource_allocation_response_msg(alloc);
	slurm_allocation_msg_thr_destroy(msg_thr);

	/*
	 * Figure out what return code we should use.  If the user's command
	 * exited normally, return the user's return code.
	 */
	rc = 1;
	if (rc_pid != -1) {

		if (WIFEXITED(status)) {
			rc = WEXITSTATUS(status);
		} else if (WIFSIGNALED(status)) {
			verbose("Command \"%s\" was terminated by signal %d",
				command_argv[0], WTERMSIG(status));
			/* if we get these signals we return a normal
			 * exit since this was most likely sent from the
			 * user */
			switch(WTERMSIG(status)) {
			case SIGHUP:
			case SIGINT:
			case SIGQUIT:
			case SIGKILL:
				rc = 0;
				break;
			default:
				break;
			}
		}
	}
	return rc;
}

static void _set_exit_code(void)
{
	int i;
	char *val;

	if ((val = getenv("SLURM_EXIT_ERROR"))) {
		i = atoi(val);
		if (i == 0)
			error("SLURM_EXIT_ERROR has zero value");
		else
			error_exit = i;
	}

	if ((val = getenv("SLURM_EXIT_IMMEDIATE"))) {
		i = atoi(val);
		if (i == 0)
			error("SLURM_EXIT_IMMEDIATE has zero value");
		else
			immediate_exit = i;
	}
}

/* Propagate SPANK environment via SLURM_SPANK_ environment variables */
static void _set_spank_env(void)
{
	int i;

	for (i=0; i<opt.spank_job_env_size; i++) {
		if (setenvfs("SLURM_SPANK_%s", opt.spank_job_env[i]) < 0) {
			error("unable to set %s in environment",
			      opt.spank_job_env[i]);
		}
	}
}

/* Set SLURM_SUBMIT_DIR environment variable with current state */
static void _set_submit_dir_env(void)
{
	work_dir = xmalloc(MAXPATHLEN + 1);
	if ((getcwd(work_dir, MAXPATHLEN)) == NULL) {
		error("getcwd failed: %m");
		exit(error_exit);
	}

	if (setenvf(NULL, "SLURM_SUBMIT_DIR", "%s", work_dir) < 0) {
		error("unable to set SLURM_SUBMIT_DIR in environment");
		return;
	}
}

/* Returns 0 on success, -1 on failure */
static int _fill_job_desc_from_opts(job_desc_msg_t *desc)
{
	desc->contiguous = opt.contiguous ? 1 : 0;
	desc->features = opt.constraints;
	desc->gres = opt.gres;
	if (opt.immediate == 1)
		desc->immediate = 1;
	desc->name = xstrdup(opt.job_name);
	desc->reservation = xstrdup(opt.reservation);
	desc->wckey  = xstrdup(opt.wckey);

	desc->req_nodes = opt.nodelist;
	desc->exc_nodes = opt.exc_nodes;
	desc->partition = opt.partition;
	desc->min_nodes = opt.min_nodes;
	if (opt.max_nodes)
		desc->max_nodes = opt.max_nodes;
	desc->user_id = opt.uid;
	desc->group_id = opt.gid;
	if (opt.dependency)
		desc->dependency = xstrdup(opt.dependency);

	if (opt.cpu_bind)
		desc->cpu_bind       = opt.cpu_bind;
	if (opt.cpu_bind_type)
		desc->cpu_bind_type  = opt.cpu_bind_type;
	if (opt.mem_bind)
		desc->mem_bind       = opt.mem_bind;
	if (opt.mem_bind_type)
		desc->mem_bind_type  = opt.mem_bind_type;
	if (opt.plane_size != NO_VAL)
		desc->plane_size     = opt.plane_size;
	desc->task_dist  = opt.distribution;
	if (opt.plane_size != NO_VAL)
		desc->plane_size = opt.plane_size;

	if (opt.licenses)
		desc->licenses = xstrdup(opt.licenses);
	desc->network = opt.network;
	if (opt.nice)
		desc->nice = NICE_OFFSET + opt.nice;
	desc->mail_type = opt.mail_type;
	if (opt.mail_user)
		desc->mail_user = xstrdup(opt.mail_user);
	if (opt.begin)
		desc->begin_time = opt.begin;
	if (opt.account)
		desc->account = xstrdup(opt.account);
	if (opt.acctg_freq >= 0)
		desc->acctg_freq = opt.acctg_freq;
	if (opt.comment)
		desc->comment = xstrdup(opt.comment);
	if (opt.qos)
		desc->qos = xstrdup(opt.qos);

	if (opt.cwd)
		desc->work_dir = xstrdup(opt.cwd);
	else if (work_dir)
		desc->work_dir = xstrdup(work_dir);

	if (opt.hold)
		desc->priority     = 0;
#ifdef HAVE_BG
	if (opt.geometry[0] > 0) {
		int i;
		for (i=0; i<SYSTEM_DIMENSIONS; i++)
			desc->geometry[i] = opt.geometry[i];
	}
#endif
	if (opt.conn_type != (uint16_t)NO_VAL)
		desc->conn_type = opt.conn_type;
	if (opt.reboot)
		desc->reboot = 1;
	if (opt.no_rotate)
		desc->rotate = 0;
	if (opt.blrtsimage)
		desc->blrtsimage = xstrdup(opt.blrtsimage);
	if (opt.linuximage)
		desc->linuximage = xstrdup(opt.linuximage);
	if (opt.mloaderimage)
		desc->mloaderimage = xstrdup(opt.mloaderimage);
	if (opt.ramdiskimage)
		desc->ramdiskimage = xstrdup(opt.ramdiskimage);

	/* job constraints */
	if (opt.mincpus > -1)
		desc->pn_min_cpus = opt.mincpus;
	if (opt.realmem > -1)
		desc->pn_min_memory = opt.realmem;
	else if (opt.mem_per_cpu > -1)
		desc->pn_min_memory = opt.mem_per_cpu | MEM_PER_CPU;
	if (opt.tmpdisk > -1)
		desc->pn_min_tmp_disk = opt.tmpdisk;
	if (opt.overcommit) {
		desc->min_cpus = opt.min_nodes;
		desc->overcommit = opt.overcommit;
	} else
		desc->min_cpus = opt.ntasks * opt.cpus_per_task;
	if (opt.ntasks_set)
		desc->num_tasks = opt.ntasks;
	if (opt.cpus_set)
		desc->cpus_per_task = opt.cpus_per_task;
	if (opt.ntasks_per_node)
		desc->ntasks_per_node = opt.ntasks_per_node;
	if (opt.ntasks_per_socket > -1)
		desc->ntasks_per_socket = opt.ntasks_per_socket;
	if (opt.ntasks_per_core > -1)
		desc->ntasks_per_core = opt.ntasks_per_core;

	/* node constraints */
	if (opt.sockets_per_node != NO_VAL)
		desc->sockets_per_node = opt.sockets_per_node;
	if (opt.cores_per_socket != NO_VAL)
		desc->cores_per_socket = opt.cores_per_socket;
	if (opt.threads_per_core != NO_VAL)
		desc->threads_per_core = opt.threads_per_core;

	if (opt.no_kill)
		desc->kill_on_node_fail = 0;
	if (opt.time_limit  != NO_VAL)
		desc->time_limit = opt.time_limit;
	if (opt.time_min  != NO_VAL)
		desc->time_min = opt.time_min;
	desc->shared = opt.shared;
	desc->job_id = opt.jobid;

	desc->wait_all_nodes = opt.wait_all_nodes;
	if (opt.warn_signal)
		desc->warn_signal = opt.warn_signal;
	if (opt.warn_time)
		desc->warn_time = opt.warn_time;

	if (opt.spank_job_env_size) {
		desc->spank_job_env      = opt.spank_job_env;
		desc->spank_job_env_size = opt.spank_job_env_size;
	}

	return 0;
}

static void _ring_terminal_bell(void)
{
        if (isatty(STDOUT_FILENO)) {
                fprintf(stdout, "\a");
                fflush(stdout);
        }
}

/* returns the pid of the forked command, or <0 on error */
static pid_t _fork_command(char **command)
{
	pid_t pid;

	pid = fork();
	if (pid < 0) {
		error("fork failed: %m");
	} else if (pid == 0) {
		/* child */
		setpgid(getpid(), 0);

		/*
		 * Reset job control signals.
		 * Suspend (TSTP) is not restored (ignored, as in the parent):
		 * shells with job-control override this and look after their
		 * processes.
		 * Suspending single commands is more complex and would require
		 * adding full shell-like job control to salloc.
		 */
		xsignal(SIGINT, SIG_DFL);
		xsignal(SIGQUIT, SIG_DFL);
		xsignal(SIGTTIN, SIG_DFL);
		xsignal(SIGTTOU, SIG_DFL);

		execvp(command[0], command);

		/* should only get here if execvp failed */
		error("Unable to exec command \"%s\"", command[0]);
		exit(error_exit);
	}
	/* parent returns */
	return pid;
}

static void _pending_callback(uint32_t job_id)
{
	info("Pending job allocation %u", job_id);
	pending_job_id = job_id;
}

static void _exit_on_signal(int signo)
{
	_forward_signal(signo);
	exit_flag = true;
}

static void _forward_signal(int signo)
{
	if (command_pid > 0)
		killpg(command_pid, signo);
}

static void _signal_while_allocating(int signo)
{
	allocation_interrupted = true;
	if (pending_job_id != 0) {
		slurm_complete_job(pending_job_id, 0);
	}
}

/* This typically signifies the job was cancelled by scancel */
static void _job_complete_handler(srun_job_complete_msg_t *comp)
{
	if (pending_job_id && (pending_job_id != comp->job_id)) {
		error("Ignoring bogus job_complete call: job %u is not "
		      "job %u", pending_job_id, comp->job_id);
		return;
	}

	if (comp->step_id == NO_VAL) {
		pthread_mutex_lock(&allocation_state_lock);
		if (allocation_state != REVOKED) {
			/* If the allocation_state is already REVOKED, then
			 * no need to print this message.  We probably
			 * relinquished the allocation ourself.
			 */
			if (last_timeout && (last_timeout < time(NULL))) {
				info("Job %u has exceeded its time limit and "
				     "its allocation has been revoked.",
				     comp->job_id);
			} else {
				info("Job allocation %u has been revoked.",
				     comp->job_id);
			}
		}
		allocation_state = REVOKED;
		pthread_mutex_unlock(&allocation_state_lock);
		/*
		 * Clean up child process: only if the forked process has not
		 * yet changed state (waitpid returning 0).
		 */
		if ((command_pid > -1) &&
		    (waitpid(command_pid, NULL, WNOHANG) == 0)) {
			int signal = 0;
#if defined(HAVE_CRAY)
			signal = SIGTERM;
#else
			if (opt.kill_command_signal_set)
				signal = opt.kill_command_signal;
#endif
			if (signal) {
				 verbose("Sending signal %d to command \"%s\","
					 " pid %d",
					 signal, command_argv[0], command_pid);
				_forward_signal(signal);
			}
		}
	} else {
		verbose("Job step %u.%u is finished.",
			comp->job_id, comp->step_id);
	}
}

/*
 * Job has been notified of it's approaching time limit.
 * Job will be killed shortly after timeout.
 * This RPC can arrive multiple times with the same or updated timeouts.
 * FIXME: We may want to signal the job or perform other action for this.
 * FIXME: How much lead time do we want for this message? Some jobs may
 *	require tens of minutes to gracefully terminate.
 */
static void _timeout_handler(srun_timeout_msg_t *msg)
{
	if (msg->timeout != last_timeout) {
		last_timeout = msg->timeout;
		verbose("Job allocation time limit to be reached at %s",
			ctime(&msg->timeout));
	}
}

static void _user_msg_handler(srun_user_msg_t *msg)
{
	info("%s", msg->msg);
}

static void _ping_handler(srun_ping_msg_t *msg)
{
	/* the api will respond so there really isn't anything to do
	   here */
}

static void _node_fail_handler(srun_node_fail_msg_t *msg)
{
	error("Node failure on %s", msg->nodelist);
}

static void _set_rlimits(char **env)
{
	slurm_rlimits_info_t *rli;
	char env_name[25] = "SLURM_RLIMIT_";
	char *env_value, *p;
	struct rlimit r;
	//unsigned long env_num;
	rlim_t env_num;

	for (rli=get_slurm_rlimits_info(); rli->name; rli++) {
		if (rli->propagate_flag != PROPAGATE_RLIMITS)
			continue;
		strcpy(&env_name[sizeof("SLURM_RLIMIT_")-1], rli->name);
		env_value = getenvp(env, env_name);
		if (env_value == NULL)
			continue;
		unsetenvp(env, env_name);
		if (getrlimit(rli->resource, &r) < 0) {
			error("getrlimit(%s): %m", env_name+6);
			continue;
		}
		env_num = strtol(env_value, &p, 10);
		if (p && (p[0] != '\0')) {
			error("Invalid environment %s value %s",
			      env_name, env_value);
			continue;
		}
		if (r.rlim_cur == env_num)
			continue;
		r.rlim_cur = (rlim_t) env_num;
		if (setrlimit(rli->resource, &r) < 0) {
			error("setrlimit(%s): %m", env_name+6);
			continue;
		}
	}
}

#ifdef HAVE_BG
/* returns 1 if job and nodes are ready for job to begin, 0 otherwise */
static int _wait_bluegene_block_ready(resource_allocation_response_msg_t *alloc)
{
	int is_ready = 0, i, rc;
	char *block_id = NULL;
	int cur_delay = 0;
	int max_delay = BG_FREE_PREVIOUS_BLOCK + BG_MIN_BLOCK_BOOT +
			(BG_INCR_BLOCK_BOOT * alloc->node_cnt);

	pending_job_id = alloc->job_id;
	select_g_select_jobinfo_get(alloc->select_jobinfo,
				    SELECT_JOBDATA_BLOCK_ID,
				    &block_id);

	for (i=0; (cur_delay < max_delay); i++) {
		if (i == 1)
			info("Waiting for block %s to become ready for job",
			     block_id);
		if (i) {
			sleep(POLL_SLEEP);
			rc = _blocks_dealloc();
			if ((rc == 0) || (rc == -1))
				cur_delay += POLL_SLEEP;
			debug("still waiting");
		}

		rc = slurm_job_node_ready(alloc->job_id);

		if (rc == READY_JOB_FATAL)
			break;				/* fatal error */
		if ((rc == READY_JOB_ERROR) || (rc == EAGAIN))
			continue;			/* retry */
		if ((rc & READY_JOB_STATE) == 0)	/* job killed */
			break;
		if (rc & READY_NODE_STATE) {		/* job and node ready */
			is_ready = 1;
			break;
		}
		if (allocation_interrupted)
			break;
	}
	if (is_ready)
     		info("Block %s is ready for job", block_id);
	else if (!allocation_interrupted)
		error("Block %s still not ready", block_id);
	else	/* allocation_interrupted and slurmctld not responing */
		is_ready = 0;

	xfree(block_id);
	pending_job_id = 0;

	return is_ready;
}

/*
 * Test if any BG blocks are in deallocating state since they are
 * probably related to this job we will want to sleep longer
 * RET	1:  deallocate in progress
 *	0:  no deallocate in progress
 *     -1: error occurred
 */
static int _blocks_dealloc(void)
{
	static block_info_msg_t *bg_info_ptr = NULL, *new_bg_ptr = NULL;
	int rc = 0, error_code = 0, i;

	if (bg_info_ptr) {
		error_code = slurm_load_block_info(bg_info_ptr->last_update,
						   &new_bg_ptr, SHOW_ALL);
		if (error_code == SLURM_SUCCESS)
			slurm_free_block_info_msg(bg_info_ptr);
		else if (slurm_get_errno() == SLURM_NO_CHANGE_IN_DATA) {
			error_code = SLURM_SUCCESS;
			new_bg_ptr = bg_info_ptr;
		}
	} else {
		error_code = slurm_load_block_info((time_t) NULL,
						   &new_bg_ptr, SHOW_ALL);
	}

	if (error_code) {
		error("slurm_load_partitions: %s",
		      slurm_strerror(slurm_get_errno()));
		return -1;
	}
	for (i=0; i<new_bg_ptr->record_count; i++) {
		if(new_bg_ptr->block_array[i].state
		   == RM_PARTITION_DEALLOCATING) {
			rc = 1;
			break;
		}
	}
	bg_info_ptr = new_bg_ptr;
	return rc;
}
#else
/* returns 1 if job and nodes are ready for job to begin, 0 otherwise */
static int _wait_nodes_ready(resource_allocation_response_msg_t *alloc)
{
	int is_ready = 0, i, rc;
	int cur_delay = 0;
	int suspend_time, resume_time, max_delay;

	suspend_time = slurm_get_suspend_timeout();
	resume_time  = slurm_get_resume_timeout();
	if ((suspend_time == 0) || (resume_time == 0))
		return 1;	/* Power save mode disabled */
	max_delay = suspend_time + resume_time;
	max_delay *= 5;		/* Allow for ResumeRate support */
	pending_job_id = alloc->job_id;

	if (opt.wait_all_nodes == (uint16_t) NO_VAL)
		opt.wait_all_nodes = DEFAULT_WAIT_ALL_NODES;

	for (i=0; (cur_delay < max_delay); i++) {
		if (i) {
			if (i == 1)
				info("Waiting for nodes to boot");
			else
				debug("still waiting");
			sleep(POLL_SLEEP);
			cur_delay += POLL_SLEEP;
		}

		if (opt.wait_all_nodes)
			rc = slurm_job_node_ready(alloc->job_id);
		else {
			is_ready = 1;
			break;
		}

		if (rc == READY_JOB_FATAL)
			break;				/* fatal error */
		if ((rc == READY_JOB_ERROR) || (rc == EAGAIN))
			continue;			/* retry */
		if ((rc & READY_JOB_STATE) == 0)	/* job killed */
			break;
		if (rc & READY_NODE_STATE) {		/* job and node ready */
			is_ready = 1;
			break;
		}
		if (allocation_interrupted)
			break;
	}
	if (is_ready) {
		if (i > 0)
     			info ("Nodes %s are ready for job", alloc->node_list);
	} else if (!allocation_interrupted)
		error("Nodes %s are still not ready", alloc->node_list);
	else	/* allocation_interrupted or slurmctld not responing */
		is_ready = 0;

	pending_job_id = 0;

	return is_ready;
}
#endif	/* HAVE_BG */

#ifdef HAVE_CRAY
/* returns 1 if job and nodes are ready for job to begin, 0 otherwise */
static int _claim_reservation(resource_allocation_response_msg_t *alloc)
{
	int rc = 0;
	uint32_t resv_id = 0;

	select_g_select_jobinfo_get(alloc->select_jobinfo,
				    SELECT_JOBDATA_RESV_ID,
				    &resv_id);
	if (!resv_id)
		return rc;
	if (basil_resv_conf(resv_id, alloc->job_id) == SLURM_SUCCESS)
		rc = 1;
	xfree(resv_id);
	return rc;
}
#endif