Skip to content
Snippets Groups Projects
proc_req.c 48.9 KiB
Newer Older
/*****************************************************************************\
 *  proc_msg.c - process incomming messages to slurmctld
 *****************************************************************************
 *  Copyright (C) 2002 The Regents of the University of California.
 *  Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
 *  Written by Moe Jette <jette@llnl.gov>, Kevin Tew <tew1@llnl.gov>, et. al.
 *  UCRL-CODE-2002-040.
 *  
 *  This file is part of SLURM, a resource management program.
 *  For details, see <http://www.llnl.gov/linux/slurm/>.
 *  
 *  SLURM is free software; you can redistribute it and/or modify it under
 *  the terms of the GNU General Public License as published by the Free
 *  Software Foundation; either version 2 of the License, or (at your option)
 *  any later version.
 *  
 *  SLURM is distributed in the hope that it will be useful, but WITHOUT ANY
 *  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
 *  FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
 *  details.
 *  
 *  You should have received a copy of the GNU General Public License along
 *  with SLURM; if not, write to the Free Software Foundation, Inc.,
 *  59 Temple Place, Suite 330, Boston, MA  02111-1307  USA.
\*****************************************************************************/

#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif

#ifdef WITH_PTHREADS
#  include <pthread.h>
#endif				/* WITH_PTHREADS */

#include <errno.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include <slurm/slurm_errno.h>

#include "src/common/daemonize.h"
#include "src/common/fd.h"
#include "src/common/hostlist.h"
#include "src/common/log.h"
#include "src/common/macros.h"
#include "src/common/pack.h"
#include "src/common/read_config.h"
#include "src/common/slurm_auth.h"
#include "src/common/slurm_cred.h"
#include "src/common/slurm_protocol_api.h"
#include "src/common/xstring.h"

#if HAVE_ELAN
#  include "src/common/qsw.h"
#endif

#include "src/slurmctld/locks.h"
#include "src/slurmctld/proc_req.h"
#include "src/slurmctld/read_config.h"
#include "src/slurmctld/slurmctld.h"

#define BUF_SIZE	  1024	/* Temporary buffer size */

static void         _fill_ctld_conf(slurm_ctl_conf_t * build_ptr);
static inline bool 	_is_super_user(uid_t uid);
static void         _kill_job_on_msg_fail(uint32_t job_id);
static int          _make_step_cred(struct step_record *step_rec, 
				    slurm_cred_t *slurm_cred);
inline static void  _slurm_rpc_allocate_resources(slurm_msg_t * msg);
inline static void  _slurm_rpc_allocate_and_run(slurm_msg_t * msg);
inline static void  _slurm_rpc_dump_conf(slurm_msg_t * msg);
inline static void  _slurm_rpc_dump_jobs(slurm_msg_t * msg);
inline static void  _slurm_rpc_dump_nodes(slurm_msg_t * msg);
inline static void  _slurm_rpc_dump_partitions(slurm_msg_t * msg);
inline static void  _slurm_rpc_epilog_complete(slurm_msg_t * msg);
inline static void  _slurm_rpc_job_step_kill(slurm_msg_t * msg);
inline static void  _slurm_rpc_job_step_complete(slurm_msg_t * msg);
inline static void  _slurm_rpc_job_step_create(slurm_msg_t * msg);
inline static void  _slurm_rpc_job_step_get_info(slurm_msg_t * msg);
inline static void  _slurm_rpc_job_will_run(slurm_msg_t * msg);
inline static void  _slurm_rpc_node_registration(slurm_msg_t * msg);
inline static void  _slurm_rpc_old_job_alloc(slurm_msg_t * msg);
inline static void  _slurm_rpc_ping(slurm_msg_t * msg);
inline static void  _slurm_rpc_reconfigure_controller(slurm_msg_t * msg);
inline static void  _slurm_rpc_shutdown_controller(slurm_msg_t * msg);
inline static void  _slurm_rpc_shutdown_controller_immediate(slurm_msg_t *
							     msg);
inline static void  _slurm_rpc_submit_batch_job(slurm_msg_t * msg);
inline static void  _slurm_rpc_update_job(slurm_msg_t * msg);
inline static void  _slurm_rpc_update_node(slurm_msg_t * msg);
inline static void  _slurm_rpc_update_partition(slurm_msg_t * msg);
inline static void  _slurm_rpc_delete_partition(slurm_msg_t * msg);
inline static void  _update_cred_key(void);

/*
 * diff_tv_str - build a string showing the time difference between two times
 * IN tv1 - start of event
 * IN tv2 - end of event
 * OUT tv_str - place to put delta time in format "usec=%ld"
 * IN len_tv_str - size of tv_str in bytes
 */
inline void diff_tv_str(struct timeval *tv1,struct timeval *tv2, 
		char *tv_str, int len_tv_str)
{
	long delta_t;
	delta_t  = (tv2->tv_sec  - tv1->tv_sec) * 1000000;
	delta_t +=  tv2->tv_usec - tv1->tv_usec;
	snprintf(tv_str, len_tv_str, "usec=%ld", delta_t);
}

/*
 * slurmctld_req  - Process an individual RPC request
 * IN/OUT msg - the request message, data associated with the message is freed
 */
void slurmctld_req (slurm_msg_t * msg)
{

	switch (msg->msg_type) {
	case REQUEST_RESOURCE_ALLOCATION:
		_slurm_rpc_allocate_resources(msg);
		slurm_free_job_desc_msg(msg->data);
		break;
	case REQUEST_ALLOCATION_AND_RUN_JOB_STEP:
		_slurm_rpc_allocate_and_run(msg);
		slurm_free_job_desc_msg(msg->data);
		break;
	case REQUEST_BUILD_INFO:
		_slurm_rpc_dump_conf(msg);
		slurm_free_last_update_msg(msg->data);
		break;
	case REQUEST_JOB_INFO:
		_slurm_rpc_dump_jobs(msg);
		slurm_free_job_info_request_msg(msg->data);
		break;
	case REQUEST_NODE_INFO:
		_slurm_rpc_dump_nodes(msg);
		slurm_free_last_update_msg(msg->data);
		break;
	case REQUEST_PARTITION_INFO:
		_slurm_rpc_dump_partitions(msg);
		slurm_free_last_update_msg(msg->data);
		break;
	case MESSAGE_EPILOG_COMPLETE:
		_slurm_rpc_epilog_complete(msg);
		slurm_free_epilog_complete_msg(msg->data);
		break;
	case REQUEST_CANCEL_JOB_STEP:
		_slurm_rpc_job_step_kill(msg);
		slurm_free_job_step_kill_msg(msg->data);
		break;
	case REQUEST_COMPLETE_JOB_STEP:
		_slurm_rpc_job_step_complete(msg);
		slurm_free_job_complete_msg(msg->data);
		break;
	case REQUEST_JOB_STEP_CREATE:
		_slurm_rpc_job_step_create(msg);
		slurm_free_job_step_create_request_msg(msg->data);
		break;
	case REQUEST_JOB_STEP_INFO:
		_slurm_rpc_job_step_get_info(msg);
		slurm_free_job_step_info_request_msg(msg->data);
		break;
	case REQUEST_JOB_WILL_RUN:
		_slurm_rpc_job_will_run(msg->data);
		slurm_free_job_desc_msg(msg->data);
		break;
	case MESSAGE_NODE_REGISTRATION_STATUS:
		_slurm_rpc_node_registration(msg);
		slurm_free_node_registration_status_msg(msg->data);
		break;
	case REQUEST_OLD_JOB_RESOURCE_ALLOCATION:
		_slurm_rpc_old_job_alloc(msg);
		slurm_free_old_job_alloc_msg(msg->data);
		break;
	case REQUEST_PING:
		_slurm_rpc_ping(msg);
		/* No body to free */
		break;
	case REQUEST_RECONFIGURE:
		_slurm_rpc_reconfigure_controller(msg);
		/* No body to free */
		break;
	case REQUEST_CONTROL:
		_slurm_rpc_shutdown_controller(msg);
		/* No body to free */
		break;
	case REQUEST_SHUTDOWN:
		_slurm_rpc_shutdown_controller(msg);
		slurm_free_shutdown_msg(msg->data);
		break;
	case REQUEST_SHUTDOWN_IMMEDIATE:
		_slurm_rpc_shutdown_controller_immediate(msg);
		/* No body to free */
		break;
	case REQUEST_SUBMIT_BATCH_JOB:
		_slurm_rpc_submit_batch_job(msg);
		slurm_free_job_desc_msg(msg->data);
		break;
	case REQUEST_UPDATE_JOB:
		_slurm_rpc_update_job(msg);
		slurm_free_job_desc_msg(msg->data);
		break;
	case REQUEST_UPDATE_NODE:
		_slurm_rpc_update_node(msg);
		slurm_free_update_node_msg(msg->data);
		break;
	case REQUEST_UPDATE_PARTITION:
		_slurm_rpc_update_partition(msg);
		slurm_free_update_part_msg(msg->data);
		break;
	case REQUEST_DELETE_PARTITION:
		_slurm_rpc_delete_partition(msg);
		slurm_free_delete_part_msg(msg->data);
		break;
	case REQUEST_NODE_REGISTRATION_STATUS:
		error("slurmctld is talking with itself. SlurmctldPort == SlurmdPort");
		slurm_send_rc_msg(msg, EINVAL);
		break;
		error("invalid RPC msg_type=%d", msg->msg_type);
		slurm_send_rc_msg(msg, EINVAL);
		break;
	}
}

/*
 * _fill_ctld_conf - make a copy of current slurm configuration
 *	this is done with locks set so the data can change at other times
 * OUT conf_ptr - place to copy configuration to
 */
void _fill_ctld_conf(slurm_ctl_conf_t * conf_ptr)
{
	conf_ptr->last_update         = time(NULL);
	conf_ptr->authtype            = slurmctld_conf.authtype;
	conf_ptr->backup_addr         = slurmctld_conf.backup_addr;
	conf_ptr->backup_controller   = slurmctld_conf.backup_controller;
	conf_ptr->control_addr        = slurmctld_conf.control_addr;
	conf_ptr->control_machine     = slurmctld_conf.control_machine;
	conf_ptr->epilog              = slurmctld_conf.epilog;
	conf_ptr->fast_schedule       = slurmctld_conf.fast_schedule;
	conf_ptr->first_job_id        = slurmctld_conf.first_job_id;
	conf_ptr->hash_base           = slurmctld_conf.hash_base;
	conf_ptr->heartbeat_interval  = slurmctld_conf.heartbeat_interval;
	conf_ptr->inactive_limit      = slurmctld_conf.inactive_limit;
	conf_ptr->job_comp_loc        = slurmctld_conf.job_comp_loc;
	conf_ptr->job_comp_type       = slurmctld_conf.job_comp_type;
	conf_ptr->job_credential_private_key = 
			slurmctld_conf.job_credential_private_key;
	conf_ptr->job_credential_public_certificate = 
			slurmctld_conf.job_credential_public_certificate;
	conf_ptr->kill_wait           = slurmctld_conf.kill_wait;
	conf_ptr->max_job_cnt         = slurmctld_conf.max_job_cnt;
	conf_ptr->min_job_age         = slurmctld_conf.min_job_age;
	conf_ptr->plugindir           = slurmctld_conf.plugindir;
	conf_ptr->prolog              = slurmctld_conf.prolog;
	conf_ptr->ret2service         = slurmctld_conf.ret2service;
	conf_ptr->slurm_user_id       = slurmctld_conf.slurm_user_id;
	conf_ptr->slurm_user_name     = slurmctld_conf.slurm_user_name;
	conf_ptr->slurmctld_debug     = slurmctld_conf.slurmctld_debug;
	conf_ptr->slurmctld_logfile   = slurmctld_conf.slurmctld_logfile;
	conf_ptr->slurmctld_pidfile   = slurmctld_conf.slurmctld_pidfile;
	conf_ptr->slurmctld_port      = slurmctld_conf.slurmctld_port;
	conf_ptr->slurmctld_timeout   = slurmctld_conf.slurmctld_timeout;
	conf_ptr->slurmd_debug        = slurmctld_conf.slurmd_debug;
	conf_ptr->slurmd_logfile      = slurmctld_conf.slurmd_logfile;
	conf_ptr->slurmd_pidfile      = slurmctld_conf.slurmd_pidfile;
	conf_ptr->slurmd_port         = slurmctld_conf.slurmd_port;
	conf_ptr->slurmd_spooldir     = slurmctld_conf.slurmd_spooldir;
	conf_ptr->slurmd_timeout      = slurmctld_conf.slurmd_timeout;
	conf_ptr->slurm_conf          = slurmctld_conf.slurm_conf;
	conf_ptr->state_save_location = slurmctld_conf.state_save_location;
	conf_ptr->tmp_fs              = slurmctld_conf.tmp_fs;
	conf_ptr->wait_time           = slurmctld_conf.wait_time;
	return;
}

/* return true if supplied uid is a super-user: root, self, or SlurmUser */
static inline bool _is_super_user(uid_t uid)
{
	if ( (uid == 0) || 
	     (uid == slurmctld_conf.slurm_user_id) ||
	     (uid == getuid()) )
		return true;
	else
		return false;
}

/* _kill_job_on_msg_fail - The request to create a job record successed, 
 *	but the reply message to srun failed. We kill the job to avoid 
 *	leaving it orphaned */
static void _kill_job_on_msg_fail(uint32_t job_id)
{
	/* Locks: Write job, write node */
	slurmctld_lock_t job_write_lock = { 
		NO_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK };

	error("Job allocate response msg send failure, killing JobId=%u",
		job_id);
	lock_slurmctld(job_write_lock);
	job_complete(job_id, 0, false, 0);
	unlock_slurmctld(job_write_lock);
}

/* create a credential for a given job step, return error code */
static int _make_step_cred(struct step_record *step_rec, 
			   slurm_cred_t *slurm_cred)
{
	slurm_cred_arg_t cred_arg;

	cred_arg.jobid    = step_rec->job_ptr->job_id;
	cred_arg.stepid   = step_rec->step_id;
	cred_arg.uid      = step_rec->job_ptr->user_id;
	cred_arg.hostlist = step_rec->step_node_list;

	if ( (*slurm_cred = slurm_cred_create(slurmctld_config.cred_ctx, 
			&cred_arg)) == NULL) {
		error("slurm_cred_create error");
		return ESLURM_INVALID_JOB_CREDENTIAL;
	}

	return SLURM_SUCCESS;
}

/* _slurm_rpc_allocate_resources:  process RPC to allocate resources for 
 *	a job */
static void _slurm_rpc_allocate_resources(slurm_msg_t * msg)
{
	/* init */
	int error_code = SLURM_SUCCESS;
	slurm_msg_t response_msg;
	job_desc_msg_t *job_desc_msg = (job_desc_msg_t *) msg->data;
	char *node_list_ptr = NULL;
	uint16_t num_cpu_groups = 0;
	uint32_t *cpus_per_node = NULL, *cpu_count_reps = NULL;
	uint32_t job_id = 0;
	resource_allocation_response_msg_t alloc_msg;
	/* Locks: Write job, write node, read partition */
	slurmctld_lock_t job_write_lock = { 
		NO_LOCK, WRITE_LOCK, WRITE_LOCK, READ_LOCK };
	uid_t uid;
	uint16_t node_cnt = 0;
	slurm_addr *node_addr = NULL;
	int immediate = job_desc_msg->immediate;

	START_TIMER;
	debug2("Processing RPC: REQUEST_RESOURCE_ALLOCATION");

	/* do RPC call */
	dump_job_desc(job_desc_msg);
	uid = g_slurm_auth_get_uid(msg->cred);
	if ( (uid != job_desc_msg->user_id) && (!_is_super_user(uid)) ) {
		error_code = ESLURM_USER_ID_MISSING;
		error("Security violation, RESOURCE_ALLOCATE from uid=%u",
		      (unsigned int) uid);
	}

	if (error_code == SLURM_SUCCESS) {
		lock_slurmctld(job_write_lock);
		error_code = job_allocate(job_desc_msg, &job_id,
					  &node_list_ptr, &num_cpu_groups,
					  &cpus_per_node, &cpu_count_reps,
					  immediate, false, true, uid,
					  &node_cnt, &node_addr);
		unlock_slurmctld(job_write_lock);
	}

	/* return result */
	if ((error_code == SLURM_SUCCESS) ||
	    ((immediate == 0) && 
	     (error_code == ESLURM_REQUESTED_PART_CONFIG_UNAVAILABLE))) {
		info("_slurm_rpc_allocate_resources JobId=%u NodeList=%s %s",
			job_id, node_list_ptr, TIME_STR);

		/* send job_ID  and node_name_ptr */
		alloc_msg.cpu_count_reps = cpu_count_reps;
		alloc_msg.cpus_per_node  = cpus_per_node;
		alloc_msg.error_code     = error_code;
		alloc_msg.job_id         = job_id;
		alloc_msg.node_addr      = node_addr;
		alloc_msg.node_cnt       = node_cnt;
		alloc_msg.node_list      = node_list_ptr;
		alloc_msg.num_cpu_groups = num_cpu_groups;
		response_msg.msg_type = RESPONSE_RESOURCE_ALLOCATION;
		response_msg.data = &alloc_msg;

		if (slurm_send_node_msg(msg->conn_fd, &response_msg) < 0)
			_kill_job_on_msg_fail(job_id);
	} else {	/* allocate error */
		info("_slurm_rpc_allocate_resources: %s ", 
		     slurm_strerror(error_code));
		slurm_send_rc_msg(msg, error_code);
	}
}

/* _slurm_rpc_allocate_and_run: process RPC to allocate resources for a job 
 *	and initiate a job step */
static void _slurm_rpc_allocate_and_run(slurm_msg_t * msg)
{
	/* init */
	int error_code = SLURM_SUCCESS;
	slurm_msg_t response_msg;
	job_desc_msg_t *job_desc_msg = (job_desc_msg_t *) msg->data;
	char *node_list_ptr = NULL;
	uint16_t num_cpu_groups = 0;
	uint32_t *cpus_per_node = NULL, *cpu_count_reps = NULL;
	uint32_t job_id;
	resource_allocation_and_run_response_msg_t alloc_msg;
	struct step_record *step_rec;
	slurm_cred_t slurm_cred;
	job_step_create_request_msg_t req_step_msg;
	/* Locks: Write job, write node, read partition */
	slurmctld_lock_t job_write_lock = { 
		NO_LOCK, WRITE_LOCK, WRITE_LOCK, READ_LOCK };
	uid_t uid;
	uint16_t node_cnt;
	slurm_addr *node_addr;
	int immediate = true;   /* implicit job_desc_msg->immediate == true */

	START_TIMER;
	debug2("Processing RPC: REQUEST_ALLOCATE_AND_RUN_JOB_STEP");

	/* do RPC call */
	dump_job_desc(job_desc_msg);
	uid = g_slurm_auth_get_uid(msg->cred);
	if ( (uid != job_desc_msg->user_id) && (!_is_super_user(uid)) ) {
		error("Security violation, ALLOCATE_AND_RUN RPC from uid=%u",
		      (unsigned int) uid);
		slurm_send_rc_msg(msg, ESLURM_USER_ID_MISSING);
		return;
	}

	lock_slurmctld(job_write_lock);
	error_code = job_allocate(job_desc_msg, &job_id,
				  &node_list_ptr, &num_cpu_groups,
				  &cpus_per_node, &cpu_count_reps,
				  immediate, false, true, uid,
				  &node_cnt, &node_addr);

	/* return result */
	if (error_code) {
		unlock_slurmctld(job_write_lock);
		info("_slurm_rpc_allocate_and_run: %s", 
		     slurm_strerror(error_code));
		slurm_send_rc_msg(msg, error_code);
		return;
	}

	req_step_msg.job_id     = job_id;
	req_step_msg.user_id    = job_desc_msg->user_id;
	req_step_msg.node_count = INFINITE;
	req_step_msg.cpu_count  = job_desc_msg->num_procs;
	req_step_msg.num_tasks  = job_desc_msg->num_tasks;
	req_step_msg.task_dist  = job_desc_msg->task_dist;
	error_code = step_create(&req_step_msg, &step_rec, true);
	if (error_code == SLURM_SUCCESS) {
		error_code = _make_step_cred(step_rec, &slurm_cred);

	/* note: no need to free step_rec, pointer to global job step record */
	if (error_code) {
		job_complete(job_id, job_desc_msg->user_id, false, 0);
		unlock_slurmctld(job_write_lock);
		info("_slurm_rpc_allocate_and_run creating job step: %s",
			slurm_strerror(error_code));
		slurm_send_rc_msg(msg, error_code);
	} else {

		info("_slurm_rpc_allocate_and_run JobId=%u NodeList=%s %s", 
			job_id, node_list_ptr, TIME_STR);

		/* send job_ID  and node_name_ptr */
		alloc_msg.job_id         = job_id;
		alloc_msg.node_list      = node_list_ptr;
		alloc_msg.num_cpu_groups = num_cpu_groups;
		alloc_msg.cpus_per_node  = cpus_per_node;
		alloc_msg.cpu_count_reps = cpu_count_reps;
		alloc_msg.job_step_id    = step_rec->step_id;
		alloc_msg.node_cnt       = node_cnt;
		alloc_msg.node_addr      = node_addr;
		alloc_msg.cred           = slurm_cred;
#ifdef HAVE_ELAN
		alloc_msg.qsw_job = qsw_copy_jobinfo(step_rec->qsw_job);
#endif
		unlock_slurmctld(job_write_lock);
		response_msg.msg_type = RESPONSE_ALLOCATION_AND_RUN_JOB_STEP;
		if (slurm_send_node_msg(msg->conn_fd, &response_msg) < 0)
			_kill_job_on_msg_fail(job_id);
		slurm_cred_destroy(slurm_cred);
#ifdef HAVE_ELAN
		qsw_free_jobinfo(alloc_msg.qsw_job);
#endif
		(void) dump_all_job_state();	/* Has its own locks */
	}
}

/* _slurm_rpc_dump_conf - process RPC for Slurm configuration information */
static void _slurm_rpc_dump_conf(slurm_msg_t * msg)
{
	slurm_msg_t response_msg;
	last_update_msg_t *last_time_msg = (last_update_msg_t *) msg->data;
	slurm_ctl_conf_info_msg_t config_tbl;
	/* Locks: Read config */
	slurmctld_lock_t config_read_lock = { 
		READ_LOCK, NO_LOCK, NO_LOCK, NO_LOCK };

	START_TIMER;
	debug2("Processing RPC: REQUEST_BUILD_INFO");
	lock_slurmctld(config_read_lock);

	/* check to see if configuration data has changed */
	if ((last_time_msg->last_update - 1) >= slurmctld_conf.last_update) {
		unlock_slurmctld(config_read_lock);
		debug2("_slurm_rpc_dump_conf, no change");
		slurm_send_rc_msg(msg, SLURM_NO_CHANGE_IN_DATA);
	} else {
		_fill_ctld_conf(&config_tbl);
		unlock_slurmctld(config_read_lock);
		END_TIMER;
		debug2("_slurm_rpc_dump_conf %s", TIME_STR);

		/* init response_msg structure */
		response_msg.address = msg->address;
		response_msg.msg_type = RESPONSE_BUILD_INFO;
		response_msg.data = &config_tbl;

		/* send message */
		slurm_send_node_msg(msg->conn_fd, &response_msg);
	}
}

/* _slurm_rpc_dump_jobs - process RPC for job state information */
static void _slurm_rpc_dump_jobs(slurm_msg_t * msg)
{
	char *dump;
	int dump_size;
	slurm_msg_t response_msg;
	job_info_request_msg_t *last_time_msg =
	    (job_info_request_msg_t *) msg->data;
	/* Locks: Read job */
	slurmctld_lock_t job_read_lock = { 
		NO_LOCK, READ_LOCK, NO_LOCK, NO_LOCK };

	START_TIMER;
	debug2("Processing RPC: REQUEST_JOB_INFO");
	lock_slurmctld(job_read_lock);

	if ((last_time_msg->last_update - 1) >= last_job_update) {
		unlock_slurmctld(job_read_lock);
		debug2("_slurm_rpc_dump_jobs, no change");
		slurm_send_rc_msg(msg, SLURM_NO_CHANGE_IN_DATA);
	} else {
		pack_all_jobs(&dump, &dump_size);
		unlock_slurmctld(job_read_lock);
		END_TIMER;
		debug2("_slurm_rpc_dump_jobs, size=%d %s",
		     dump_size, TIME_STR);

		/* init response_msg structure */
		response_msg.address = msg->address;
		response_msg.msg_type = RESPONSE_JOB_INFO;
		response_msg.data = dump;
		response_msg.data_size = dump_size;

		/* send message */
		slurm_send_node_msg(msg->conn_fd, &response_msg);
		xfree(dump);
	}
}

/* _slurm_rpc_dump_nodes - process RPC for node state information */
static void _slurm_rpc_dump_nodes(slurm_msg_t * msg)
{
	char *dump;
	int dump_size;
	slurm_msg_t response_msg;
	last_update_msg_t *last_time_msg = (last_update_msg_t *) msg->data;
	/* Locks: Read node */
	slurmctld_lock_t node_read_lock = { 
		NO_LOCK, NO_LOCK, READ_LOCK, NO_LOCK };

	START_TIMER;
	debug2("Processing RPC: REQUEST_NODE_INFO");
	lock_slurmctld(node_read_lock);

	if ((last_time_msg->last_update - 1) >= last_node_update) {
		unlock_slurmctld(node_read_lock);
		debug2("_slurm_rpc_dump_nodes, no change");
		slurm_send_rc_msg(msg, SLURM_NO_CHANGE_IN_DATA);
	} else {
		pack_all_node(&dump, &dump_size);
		unlock_slurmctld(node_read_lock);
		END_TIMER;
		debug2("_slurm_rpc_dump_nodes, size=%d %s",
		     dump_size, TIME_STR);

		/* init response_msg structure */
		response_msg.address = msg->address;
		response_msg.msg_type = RESPONSE_NODE_INFO;
		response_msg.data = dump;
		response_msg.data_size = dump_size;

		/* send message */
		slurm_send_node_msg(msg->conn_fd, &response_msg);
		xfree(dump);
	}
}

/* _slurm_rpc_dump_partitions - process RPC for partition state information */
static void _slurm_rpc_dump_partitions(slurm_msg_t * msg)
{
	char *dump;
	int dump_size;
	slurm_msg_t response_msg;
	last_update_msg_t *last_time_msg = (last_update_msg_t *) msg->data;
	/* Locks: Read partition */
	slurmctld_lock_t part_read_lock = { 
		NO_LOCK, NO_LOCK, NO_LOCK, READ_LOCK };

	START_TIMER;
	debug2("Processing RPC: REQUEST_PARTITION_INFO");
	lock_slurmctld(part_read_lock);

	if ((last_time_msg->last_update - 1) >= last_part_update) {
		unlock_slurmctld(part_read_lock);
		debug2("_slurm_rpc_dump_partitions, no change");
		slurm_send_rc_msg(msg, SLURM_NO_CHANGE_IN_DATA);
	} else {
		pack_all_part(&dump, &dump_size);
		unlock_slurmctld(part_read_lock);
		END_TIMER;
		debug2("_slurm_rpc_dump_partitions, size=%d %s",
		     dump_size, TIME_STR);

		/* init response_msg structure */
		response_msg.address = msg->address;
		response_msg.msg_type = RESPONSE_PARTITION_INFO;
		response_msg.data = dump;
		response_msg.data_size = dump_size;

		/* send message */
		slurm_send_node_msg(msg->conn_fd, &response_msg);
		xfree(dump);
	}
}

/* _slurm_rpc_epilog_complete - process RPC noting the completion of 
 * the epilog denoting the completion of a job it its entirety */
static void  _slurm_rpc_epilog_complete(slurm_msg_t * msg)
{
	DEF_TIMERS;
	/* Locks: Write job, write node */
	slurmctld_lock_t job_write_lock = { 
		NO_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK };
	uid_t uid;
	epilog_complete_msg_t *epilog_msg = 
		(epilog_complete_msg_t *) msg->data;
	bool run_scheduler = false;

	START_TIMER;
	debug2("Processing RPC: MESSAGE_EPILOG_COMPLETE");
	uid = g_slurm_auth_get_uid(msg->cred);
	if (!_is_super_user(uid)) {
		error("Security violation, EPILOG_COMPLETE RPC from uid=%u",
		      (unsigned int) uid);
		return;
	}

	lock_slurmctld(job_write_lock);
	if (job_epilog_complete(epilog_msg->job_id, epilog_msg->node_name, 
	                        epilog_msg->return_code))
		run_scheduler = true;
	unlock_slurmctld(job_write_lock);
	END_TIMER;

	if (epilog_msg->return_code)
		error("_slurm_rpc_epilog_complete JobId=%u Node=%s Err=%s %s",
			epilog_msg->job_id, epilog_msg->node_name,
			slurm_strerror(epilog_msg->return_code), TIME_STR);
	else
		debug2("_slurm_rpc_epilog_complete JobId=%u Node=%s %s",
			epilog_msg->job_id, epilog_msg->node_name,
			TIME_STR);

	/* Functions below provide their own locking */
	if (run_scheduler) {
		(void) schedule();
		save_all_state();
	}
/* _slurm_rpc_job_step_kill - process RPC to cancel an entire job or 
 * an individual job step */
static void _slurm_rpc_job_step_kill(slurm_msg_t * msg)
{
	/* init */
	int error_code = SLURM_SUCCESS;
	job_step_kill_msg_t *job_step_kill_msg =
	    (job_step_kill_msg_t *) msg->data;
	/* Locks: Write job, write node */
	slurmctld_lock_t job_write_lock = { 
		NO_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK };
	uid_t uid;

	START_TIMER;
	debug2("Processing RPC: REQUEST_CANCEL_JOB_STEP");
	uid = g_slurm_auth_get_uid(msg->cred);
	lock_slurmctld(job_write_lock);

	/* do RPC call */
	if (job_step_kill_msg->job_step_id == NO_VAL) {
		error_code = job_signal(job_step_kill_msg->job_id, 
					job_step_kill_msg->signal, uid);
		unlock_slurmctld(job_write_lock);
			info("_slurm_rpc_job_step_kill JobId=%u: %s", 
				job_step_kill_msg->job_id, 
				slurm_strerror(error_code));
			slurm_send_rc_msg(msg, error_code);
		} else {
			info("_slurm_rpc_job_step_kill JobId=%u %s",
				job_step_kill_msg->job_id, TIME_STR);
			slurm_send_rc_msg(msg, SLURM_SUCCESS);

			/* Below function provides its own locking */
			(void) dump_all_job_state();
		}
	} else {
		error_code = job_step_signal(job_step_kill_msg->job_id,
					     job_step_kill_msg->job_step_id,
					     job_step_kill_msg->signal,
					     uid);
		unlock_slurmctld(job_write_lock);
			info("_slurm_rpc_job_step_kill StepId=%u.%u: %s",
				job_step_kill_msg->job_id, 
				job_step_kill_msg->job_step_id, 
				slurm_strerror(error_code));
			slurm_send_rc_msg(msg, error_code);
		} else {
			info("_slurm_rpc_job_step_kill StepId=%u.%u %s",
				job_step_kill_msg->job_step_id, TIME_STR);
			slurm_send_rc_msg(msg, SLURM_SUCCESS);

			/* Below function provides its own locking */
			(void) dump_all_job_state();
		}
	}
}

/* _slurm_rpc_job_step_complete - process RPC to note the completion an  
 *	entire job or an individual job step */
static void _slurm_rpc_job_step_complete(slurm_msg_t * msg)
{
	int error_code = SLURM_SUCCESS;
	complete_job_step_msg_t *complete_job_step_msg =
	    (complete_job_step_msg_t *) msg->data;
	/* Locks: Write job, write node */
	slurmctld_lock_t job_write_lock = { NO_LOCK, WRITE_LOCK,
		WRITE_LOCK, NO_LOCK
	};
	uid_t uid;
	bool job_requeue = false;
	bool dump_job = false, dump_node = false;
	START_TIMER;
	debug2("Processing RPC: REQUEST_COMPLETE_JOB_STEP");
	uid = g_slurm_auth_get_uid(msg->cred);
	if (!_is_super_user(uid)) {
		/* Don't trust slurm_rc, it is not from slurmd */
		complete_job_step_msg->slurm_rc = SLURM_SUCCESS;
	}
	lock_slurmctld(job_write_lock);

	/* do RPC call */
	/* First set node DOWN if fatal error */
	if (complete_job_step_msg->slurm_rc == ESLURM_ALREADY_DONE) {
		/* race condition on job termination, not a real error */
		info("slurmd error running JobId=%u from node=%s: %s",
		      complete_job_step_msg->job_id,
		      complete_job_step_msg->node_name,
		      slurm_strerror(complete_job_step_msg->slurm_rc));
		complete_job_step_msg->slurm_rc = SLURM_SUCCESS;
	}
	if (complete_job_step_msg->slurm_rc != SLURM_SUCCESS) {
		error("Fatal slurmd error running JobId=%u from node=%s: %s",
		      complete_job_step_msg->job_id,
		      complete_job_step_msg->node_name,
		      slurm_strerror(complete_job_step_msg->slurm_rc));
		if (error_code == SLURM_SUCCESS) {
			update_node_msg_t update_node_msg;
			update_node_msg.node_names =
			    complete_job_step_msg->node_name;
			update_node_msg.node_state = NODE_STATE_DOWN;
			update_node_msg.reason = "step complete failure";
			error_code = update_node(&update_node_msg);
			if (complete_job_step_msg->job_rc != SLURM_SUCCESS)
				job_requeue = true;
		}
	}

	/* Mark job and/or job step complete */
	if (complete_job_step_msg->job_step_id == NO_VAL) {
		error_code = job_complete(complete_job_step_msg->job_id,
					  uid, job_requeue,
					  complete_job_step_msg->job_rc);
		unlock_slurmctld(job_write_lock);
			info("_slurm_rpc_job_step_complete JobId=%u: %s ",
				complete_job_step_msg->job_id, 
				slurm_strerror(error_code));
			slurm_send_rc_msg(msg, error_code);
		} else {
			debug2("_slurm_rpc_job_step_complete JobId=%u %s", 
				complete_job_step_msg->job_id, TIME_STR);
			slurm_send_rc_msg(msg, SLURM_SUCCESS);
		}
	} else {
		error_code =
		    job_step_complete(complete_job_step_msg->job_id,
				      complete_job_step_msg->job_step_id,
				      uid, job_requeue,
				      complete_job_step_msg->job_rc);
		unlock_slurmctld(job_write_lock);
			info("_slurm_rpc_job_step_complete StepId=%u.%u: %s",
				complete_job_step_msg->job_id, 
				complete_job_step_msg->job_step_id, 
				slurm_strerror(error_code));
			slurm_send_rc_msg(msg, error_code);
		} else {
			info("_slurm_rpc_job_step_complete StepId=%u.%u %s",
				complete_job_step_msg->job_id, 
				complete_job_step_msg->job_step_id, TIME_STR);
			slurm_send_rc_msg(msg, SLURM_SUCCESS);
	if (dump_job)
		(void) dump_all_job_state();	/* Has own locking */
	if (dump_node)
		(void) dump_all_node_state();	/* Has own locking */
}

/* _slurm_rpc_job_step_create - process RPC to creates/registers a job step 
 *	with the step_mgr */
static void _slurm_rpc_job_step_create(slurm_msg_t * msg)
{
	/* init */
	int error_code = SLURM_SUCCESS;
	slurm_msg_t resp;
	struct step_record *step_rec;
	job_step_create_response_msg_t job_step_resp;
	job_step_create_request_msg_t *req_step_msg =
	    (job_step_create_request_msg_t *) msg->data;
	slurm_cred_t slurm_cred;
	/* Locks: Write jobs, read nodes */
	slurmctld_lock_t job_write_lock = { 
		NO_LOCK, WRITE_LOCK, READ_LOCK, NO_LOCK };
	uid_t uid;

	START_TIMER;
	debug2("Processing RPC: REQUEST_JOB_STEP_CREATE");
	dump_step_desc(req_step_msg);
	uid = g_slurm_auth_get_uid(msg->cred);
	if ( (uid != req_step_msg->user_id) && (!_is_super_user(uid)) ) {
		error_code = ESLURM_USER_ID_MISSING;
		error("Security violation, JOB_STEP_CREATE RPC from uid=%u",
			(unsigned int) uid);
	}

	if (error_code == SLURM_SUCCESS) {
		/* issue the RPC */
		lock_slurmctld(job_write_lock);
		error_code = step_create(req_step_msg, &step_rec, false);
	}
	if (error_code == SLURM_SUCCESS)
		error_code = _make_step_cred(step_rec, &slurm_cred);

	/* return result */
	if (error_code) {
		unlock_slurmctld(job_write_lock);
		info("_slurm_rpc_job_step_create: %s", 
			slurm_strerror(error_code));
		slurm_send_rc_msg(msg, error_code);
	} else {
		info("_slurm_rpc_job_step_create: StepId=%u.%u %s",
		     step_rec->job_ptr->job_id, step_rec->step_id, TIME_STR);

		job_step_resp.job_step_id = step_rec->step_id;
		job_step_resp.node_list   = xstrdup(step_rec->step_node_list);
		job_step_resp.cred        = slurm_cred;

#ifdef HAVE_ELAN
		job_step_resp.qsw_job =  qsw_copy_jobinfo(step_rec->qsw_job);
#endif
		unlock_slurmctld(job_write_lock);
		resp.address = msg->address;
		resp.msg_type = RESPONSE_JOB_STEP_CREATE;
		resp.data = &job_step_resp;

		slurm_send_node_msg(msg->conn_fd, &resp);
		xfree(job_step_resp.node_list);
		slurm_cred_destroy(slurm_cred);
#ifdef HAVE_ELAN
		qsw_free_jobinfo(job_step_resp.qsw_job);
#endif
		(void) dump_all_job_state();	/* Sets own locks */
	}
}

/* _slurm_rpc_job_step_get_info - process request for job step info */
static void _slurm_rpc_job_step_get_info(slurm_msg_t * msg)
{
	void *resp_buffer = NULL;
	int resp_buffer_size = 0;
	int error_code = SLURM_SUCCESS;
	job_step_info_request_msg_t *request =
	    (job_step_info_request_msg_t *) msg->data;
	/* Locks: Read job */
	slurmctld_lock_t job_read_lock = { NO_LOCK, READ_LOCK,
		NO_LOCK, NO_LOCK
	};

	START_TIMER;
	debug2("Processing RPC: REQUEST_JOB_STEP_INFO");
	lock_slurmctld(job_read_lock);

	if ((request->last_update - 1) >= last_job_update) {
		unlock_slurmctld(job_read_lock);
		debug2("_slurm_rpc_job_step_get_info, no change");
		error_code = SLURM_NO_CHANGE_IN_DATA;
	} else {
		Buf buffer;
		buffer = init_buf(BUF_SIZE);
		error_code =
		    pack_ctld_job_step_info_response_msg(request->job_id,
							 request->step_id,
							 buffer);
		unlock_slurmctld(job_read_lock);
		if (error_code) {
			/* job_id:step_id not found or otherwise *\
			\* error message is printed elsewhere    */
			debug2("_slurm_rpc_job_step_get_info: %s",
			free_buf(buffer);
		} else {
			resp_buffer_size = get_buf_offset(buffer);
			resp_buffer = xfer_buf_data(buffer);
			debug2("_slurm_rpc_job_step_get_info size=%d %s",
			     resp_buffer_size, TIME_STR);
	}

	if (error_code)
		slurm_send_rc_msg(msg, error_code);
	else {
		slurm_msg_t response_msg;

		response_msg.address = msg->address;
		response_msg.msg_type = RESPONSE_JOB_STEP_INFO;
		response_msg.data = resp_buffer;
		response_msg.data_size = resp_buffer_size;