Skip to content
Snippets Groups Projects
agent.c 28.5 KiB
Newer Older
/*****************************************************************************\
 *  agent.c - parallel background communication functions. This is where  
 *	logic could be placed for broadcast communications.
 *****************************************************************************
 *  Copyright (C) 2002 The Regents of the University of California.
 *  Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
 *  Written by Moe Jette <jette@llnl.gov>, et. al.
 *  Derived from pdsh written by Jim Garlick <garlick1@llnl.gov>
 *  UCRL-CODE-2002-040.
 *  
 *  This file is part of SLURM, a resource management program.
 *  For details, see <http://www.llnl.gov/linux/slurm/>.
 *  
 *  SLURM is free software; you can redistribute it and/or modify it under
 *  the terms of the GNU General Public License as published by the Free
 *  Software Foundation; either version 2 of the License, or (at your option)
 *  any later version.
 *  
 *  SLURM is distributed in the hope that it will be useful, but WITHOUT ANY
 *  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
 *  FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
 *  details.
 *  
 *  You should have received a copy of the GNU General Public License along
 *  with SLURM; if not, write to the Free Software Foundation, Inc.,
 *  59 Temple Place, Suite 330, Boston, MA  02111-1307  USA.
 *****************************************************************************
 *  Theory of operation:
 *
 *  The functions below permit slurm to initiate parallel tasks as a 
 *  detached thread and let the functions below make sure the work happens. 
 *  For example, when a job's time limit is to be changed slurmctld needs 
 *  to notify the slurmd on every node to which the job was allocated.  
 *  We don't want to hang slurmctld's primary function (the job update RPC)  
 *  to perform this work, so it just initiates an agent to perform the work.  
 *  The agent is passed all details required to perform the work, so it will 
 *  be possible to execute the agent as an pthread, process, or even a daemon 
 *  on some other computer.
 *  The main agent thread creates a separate thread for each node to be
 *  communicated with up to AGENT_THREAD_COUNT. A special watchdog thread  
 *  sends SIGLARM to any threads that have been active (in DSH_ACTIVE state)  
 *  for more than COMMAND_TIMEOUT seconds. 
 *  The agent responds to slurmctld via a function call or an RPC as required.
 *  For example, informing slurmctld that some node is not responding.
 *  All the state for each thread is maintained in thd_t struct, which is 
 *  used by the watchdog thread as well as the communication threads.
\*****************************************************************************/

#ifdef HAVE_CONFIG_H
#endif

#include <errno.h>
#include <pthread.h>
#include <signal.h>
#include <string.h>
#include "src/common/list.h"
#include "src/common/log.h"
#include "src/common/macros.h"
#include "src/common/xmalloc.h"
#include "src/common/xstring.h"
#include "src/common/slurm_protocol_api.h"
#include "src/slurmctld/agent.h"
#include "src/slurmctld/locks.h"
#include "src/slurmctld/ping_nodes.h"
#include "src/slurmctld/slurmctld.h"
#include "src/slurmctld/srun_comm.h"
#if COMMAND_TIMEOUT == 1
#  define WDOG_POLL 		1	/* secs */
#  define WDOG_POLL 		2	/* secs */
typedef enum {
	DSH_NEW,        /* Request not yet started */
	DSH_ACTIVE,     /* Request in progress */
	DSH_DONE,       /* Request completed normally */
	DSH_NO_RESP,    /* Request timed out */
	DSH_FAILED      /* Request resulted in error */
	pthread_t thread;		/* thread ID */
	pthread_attr_t attr;		/* thread attributes */
	state_t state;			/* thread state */
	time_t start_time;		/* start time */
	time_t end_time;		/* end time or delta time 
					 * upon termination */
	struct sockaddr_in slurm_addr;	/* network address */
	char node_name[MAX_NAME_LEN];	/* node's name */
} thd_t;

typedef struct agent_info {
	pthread_mutex_t thread_mutex;	/* agent specific mutex */
	pthread_cond_t thread_cond;	/* agent specific condition */
	uint32_t thread_count;		/* number of threads records */
	uint32_t threads_active;	/* currently active threads */
	uint16_t retry;			/* if set, keep trying */
	thd_t *thread_struct;		/* thread structures */
	bool get_reply;			/* flag if reply expected */
	slurm_msg_type_t msg_type;	/* RPC to be issued */
	void **msg_args_pptr;		/* RPC data to be used */
typedef struct task_info {
	pthread_mutex_t *thread_mutex_ptr; /* pointer to agent specific 
					    * mutex */
	pthread_cond_t *thread_cond_ptr;/* pointer to agent specific
					 * condition */
	uint32_t *threads_active_ptr;	/* currently active thread ptr */
	thd_t *thread_struct_ptr;	/* thread structures ptr */
	bool get_reply;			/* flag if reply expected */
	slurm_msg_type_t msg_type;	/* RPC to be issued */
	void *msg_args_ptr;		/* ptr to RPC data to be used */
} task_info_t;
typedef struct queued_request {
	agent_arg_t* agent_arg_ptr;	/* The queued request */
	time_t       last_attempt;	/* Time of last xmit attempt */
} queued_request_t;

Moe Jette's avatar
Moe Jette committed
static void _alarm_handler(int dummy);
static inline void _comm_err(char *node_name);
static void _list_delete_retry(void *retry_entry);
static agent_info_t *_make_agent_info(agent_arg_t *agent_arg_ptr);
static task_info_t *_make_task_data(agent_info_t *agent_info_ptr, int inx);
static void _notify_slurmctld_jobs(agent_info_t *agent_ptr);
static void _notify_slurmctld_nodes(agent_info_t *agent_ptr, 
		int no_resp_cnt, int retry_cnt);
static void _purge_agent_args(agent_arg_t *agent_arg_ptr);
static void _queue_agent_retry(agent_info_t * agent_info_ptr, int count);
Moe Jette's avatar
Moe Jette committed
static void _slurmctld_free_job_launch_msg(batch_job_launch_msg_t * msg);
static void _spawn_retry_agent(agent_arg_t * agent_arg_ptr);
static void *_thread_per_node_rpc(void *args);
static int   _valid_agent_arg(agent_arg_t *agent_arg_ptr);
static void *_wdog(void *args);
static pthread_mutex_t retry_mutex = PTHREAD_MUTEX_INITIALIZER;
static List retry_list = NULL;		/* agent_arg_t list for retry */
Moe Jette's avatar
Moe Jette committed
 * agent - party responsible for transmitting an common RPC in parallel 
 *	across a set of nodes
 * IN pointer to agent_arg_t, which is xfree'd (including slurm_addr, 
Moe Jette's avatar
Moe Jette committed
 *	node_names and msg_args) upon completion if AGENT_IS_THREAD is set
 * RET always NULL (function format just for use as pthread)
	pthread_attr_t attr_wdog;
	pthread_t thread_wdog;
	agent_arg_t *agent_arg_ptr = args;
	agent_info_t *agent_info_ptr = NULL;
	thd_t *thread_ptr;
	task_info_t *task_specific_ptr;
	/* basic argument value tests */
	if (_valid_agent_arg(agent_arg_ptr))
		goto cleanup;
	xsignal(SIGALRM, _alarm_handler);
	/* initialize the agent data structures */
	agent_info_ptr = _make_agent_info(agent_arg_ptr);
	thread_ptr = agent_info_ptr->thread_struct;
	if (pthread_attr_init(&attr_wdog))
		fatal("pthread_attr_init error %m");
	if (pthread_attr_setdetachstate
	    (&attr_wdog, PTHREAD_CREATE_JOINABLE))
		error("pthread_attr_setdetachstate error %m");
#ifdef PTHREAD_SCOPE_SYSTEM
	if (pthread_attr_setscope(&attr_wdog, PTHREAD_SCOPE_SYSTEM))
		error("pthread_attr_setscope error %m");
	while (pthread_create(&thread_wdog, &attr_wdog, _wdog,
				(void *) agent_info_ptr)) {
		error("pthread_create error %m");
		if (++retries > MAX_RETRIES)
			fatal("Can't create pthread");
		sleep(1);	/* sleep and again */
	fatal("AGENT_THREAD_COUNT value is invalid");
Moe Jette's avatar
Moe Jette committed
	/* start all the other threads (up to AGENT_THREAD_COUNT active) */
	for (i = 0; i < agent_info_ptr->thread_count; i++) {

		/* wait until "room" for another thread */
		slurm_mutex_lock(&agent_info_ptr->thread_mutex);
		while (agent_info_ptr->threads_active >=
		       AGENT_THREAD_COUNT) {
			pthread_cond_wait(&agent_info_ptr->thread_cond,
					  &agent_info_ptr->thread_mutex);
		/* create thread specific data, NOTE: freed from 
		 *      _thread_per_node_rpc() */
		task_specific_ptr = _make_task_data(agent_info_ptr, i);

		if (pthread_attr_init(&thread_ptr[i].attr))
			fatal("pthread_attr_init error %m");
		if (pthread_attr_setdetachstate(&thread_ptr[i].attr,
						PTHREAD_CREATE_DETACHED))
			error("pthread_attr_setdetachstate error %m");
		if (pthread_attr_setscope(&thread_ptr[i].attr,
					  PTHREAD_SCOPE_SYSTEM))
			error("pthread_attr_setscope error %m");
		while ((rc = pthread_create(&thread_ptr[i].thread,
					    &thread_ptr[i].attr,
					    _thread_per_node_rpc,
					    (void *) task_specific_ptr))) {
			error("pthread_create error %m");
			if (agent_info_ptr->threads_active)
				pthread_cond_wait(&agent_info_ptr->
						  thread_cond,
						  &agent_info_ptr->
						  thread_mutex);
				slurm_mutex_unlock(&agent_info_ptr->
						     thread_mutex);
				sleep(1);
				slurm_mutex_lock(&agent_info_ptr->
						   thread_mutex);
		slurm_mutex_unlock(&agent_info_ptr->thread_mutex);
	}

	/* wait for termination of remaining threads */
	pthread_join(thread_wdog, NULL);
#if AGENT_IS_THREAD
	_purge_agent_args(agent_arg_ptr);
		xfree(agent_info_ptr->thread_struct);
		xfree(agent_info_ptr);
/* Basic validity test of agent argument */
static int _valid_agent_arg(agent_arg_t *agent_arg_ptr)
{
	xassert(agent_arg_ptr);
	xassert(agent_arg_ptr->slurm_addr);
	xassert(agent_arg_ptr->node_names);
	xassert((agent_arg_ptr->msg_type == SRUN_PING) ||
		(agent_arg_ptr->msg_type == SRUN_TIMEOUT) || 
		(agent_arg_ptr->msg_type == SRUN_NODE_FAIL) || 
		(agent_arg_ptr->msg_type == REQUEST_KILL_JOB) || 
		(agent_arg_ptr->msg_type == REQUEST_KILL_TIMELIMIT) || 
		(agent_arg_ptr->msg_type == REQUEST_UPDATE_JOB_TIME) ||
		(agent_arg_ptr->msg_type == REQUEST_KILL_TASKS) || 
		(agent_arg_ptr->msg_type == REQUEST_PING) || 
		(agent_arg_ptr->msg_type == REQUEST_BATCH_JOB_LAUNCH) || 
		(agent_arg_ptr->msg_type == REQUEST_SHUTDOWN) || 
		(agent_arg_ptr->msg_type == REQUEST_RECONFIGURE) ||
	        (agent_arg_ptr->msg_type == RESPONSE_RESOURCE_ALLOCATION) ||	
		(agent_arg_ptr->msg_type == REQUEST_NODE_REGISTRATION_STATUS));

	if (agent_arg_ptr->node_count == 0)
		return SLURM_FAILURE;	/* no messages to be sent */
	return SLURM_SUCCESS;
}

static agent_info_t *_make_agent_info(agent_arg_t *agent_arg_ptr)
{
	int i;
	agent_info_t *agent_info_ptr;
	thd_t *thread_ptr;

	agent_info_ptr = xmalloc(sizeof(agent_info_t));

	slurm_mutex_init(&agent_info_ptr->thread_mutex);
	if (pthread_cond_init(&agent_info_ptr->thread_cond, NULL))
		fatal("pthread_cond_init error %m");
	agent_info_ptr->thread_count   = agent_arg_ptr->node_count;
	agent_info_ptr->retry          = agent_arg_ptr->retry;
	agent_info_ptr->threads_active = 0;
	thread_ptr = xmalloc(agent_arg_ptr->node_count * sizeof(thd_t));
	agent_info_ptr->thread_struct  = thread_ptr;
	agent_info_ptr->msg_type       = agent_arg_ptr->msg_type;
	agent_info_ptr->msg_args_pptr  = &agent_arg_ptr->msg_args;
	if ((agent_arg_ptr->msg_type != REQUEST_SHUTDOWN) &&
	    (agent_arg_ptr->msg_type != REQUEST_RECONFIGURE))
		agent_info_ptr->get_reply = true;
	for (i = 0; i < agent_info_ptr->thread_count; i++) {
		thread_ptr[i].state      = DSH_NEW;
		thread_ptr[i].slurm_addr = agent_arg_ptr->slurm_addr[i];
		strncpy(thread_ptr[i].node_name,
			&agent_arg_ptr->node_names[i * MAX_NAME_LEN],
			MAX_NAME_LEN);
	}

	return agent_info_ptr;
}

static task_info_t *_make_task_data(agent_info_t *agent_info_ptr, int inx)
{
	task_info_t *task_info_ptr;
	task_info_ptr = xmalloc(sizeof(task_info_t));

	task_info_ptr->thread_mutex_ptr  = &agent_info_ptr->thread_mutex;
	task_info_ptr->thread_cond_ptr   = &agent_info_ptr->thread_cond;
	task_info_ptr->threads_active_ptr= &agent_info_ptr->threads_active;
	task_info_ptr->thread_struct_ptr = &agent_info_ptr->thread_struct[inx];
	task_info_ptr->get_reply         = agent_info_ptr->get_reply;
	task_info_ptr->msg_type          = agent_info_ptr->msg_type;
	task_info_ptr->msg_args_ptr      = *agent_info_ptr->msg_args_pptr;

	return task_info_ptr;
}

 * _wdog - Watchdog thread. Send SIGALRM to threads which have been active 
 *	for too long. 
 * IN args - pointer to agent_info_t with info on threads to watch
 * Sleep for WDOG_POLL seconds between polls.
static void *_wdog(void *args)
	int fail_cnt, no_resp_cnt, retry_cnt;
	bool work_done, srun_agent = false;
	agent_info_t *agent_ptr = (agent_info_t *) args;
	thd_t *thread_ptr = agent_ptr->thread_struct;

	if ( (agent_ptr->msg_type == SRUN_PING) ||
	     (agent_ptr->msg_type == SRUN_TIMEOUT) ||
	     (agent_ptr->msg_type == RESPONSE_RESOURCE_ALLOCATION) ||
	     (agent_ptr->msg_type == SRUN_NODE_FAIL) )
		srun_agent = true;
		work_done   = true;	/* assume all threads complete */
		fail_cnt    = 0;	/* assume no threads failures */
		no_resp_cnt = 0;	/* assume all threads respond */
		retry_cnt   = 0;	/* assume no required retries */
		slurm_mutex_lock(&agent_ptr->thread_mutex);
		for (i = 0; i < agent_ptr->thread_count; i++) {
			switch (thread_ptr[i].state) {
				if (thread_ptr[i].end_time <= now) {
					debug3("agent thread %lu timed out\n", 
					       (unsigned long) thread_ptr[i].thread);
					if (pthread_kill(thread_ptr[i].thread,
						thread_ptr[i].state = DSH_NO_RESP;
				if (max_delay < (int)thread_ptr[i].end_time)
					max_delay = (int)thread_ptr[i].end_time;
			case DSH_NO_RESP:
				no_resp_cnt++;
			case DSH_FAILED:
				fail_cnt++;
				break;
		slurm_mutex_unlock(&agent_ptr->thread_mutex);
	if (srun_agent)
		_notify_slurmctld_jobs(agent_ptr);
	else
		_notify_slurmctld_nodes(agent_ptr, no_resp_cnt, retry_cnt);

	if (max_delay)
		debug2("agent maximum delay %d seconds", max_delay);

	slurm_mutex_unlock(&agent_ptr->thread_mutex);
	return (void *) NULL;
}

static void _notify_slurmctld_jobs(agent_info_t *agent_ptr)
{
#if AGENT_IS_THREAD
	/* Locks: Write job */
	slurmctld_lock_t job_write_lock =
	    { NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK };
	uint32_t job_id = 0, step_id = 0;
	thd_t *thread_ptr = agent_ptr->thread_struct;

	if        (agent_ptr->msg_type == SRUN_PING) {
		srun_ping_msg_t *msg = *agent_ptr->msg_args_pptr;
		job_id  = msg->job_id;
		step_id = msg->step_id;
	} else if (agent_ptr->msg_type == SRUN_TIMEOUT) {
		srun_timeout_msg_t *msg = *agent_ptr->msg_args_pptr;
		job_id  = msg->job_id;
		step_id = msg->step_id;
	} else if (agent_ptr->msg_type == SRUN_NODE_FAIL) {
		srun_node_fail_msg_t *msg = *agent_ptr->msg_args_pptr;
		job_id  = msg->job_id;
		step_id = msg->step_id;
	} else if (agent_ptr->msg_type == RESPONSE_RESOURCE_ALLOCATION) {
		resource_allocation_response_msg_t *msg =
			*agent_ptr->msg_args_pptr;
		job_id  = msg->job_id;
		step_id = NO_VAL;
	} else {
		error("_notify_slurmctld_jobs invalid msg_type %u",
			agent_ptr->msg_type);
		return;
	}

	lock_slurmctld(job_write_lock);
	if  (thread_ptr[0].state == DSH_DONE)
		srun_response(job_id, step_id);
	unlock_slurmctld(job_write_lock);
#else
	fatal("Code development needed here if agent is not thread");
#endif
}

static void _notify_slurmctld_nodes(agent_info_t *agent_ptr, 
		int no_resp_cnt, int retry_cnt)
{
#if AGENT_IS_THREAD
	/* Locks: Read config, write job, write node */
	slurmctld_lock_t node_write_lock =
	    { READ_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK };
#endif
	thd_t *thread_ptr = agent_ptr->thread_struct;
	int i;

	/* Notify slurmctld of non-responding nodes */
#if AGENT_IS_THREAD
		/* Update node table data for non-responding nodes */
		lock_slurmctld(node_write_lock);
		for (i = 0; i < agent_ptr->thread_count; i++) {
			if (thread_ptr[i].state == DSH_NO_RESP)
				node_not_resp(thread_ptr[i].node_name,
				              thread_ptr[i].start_time);
		if (agent_ptr->msg_type == REQUEST_BATCH_JOB_LAUNCH) {
			/* Requeue the request */
			batch_job_launch_msg_t *launch_msg_ptr = 
					*agent_ptr->msg_args_pptr;
			uint32_t job_id = launch_msg_ptr->job_id;
			info("Non-responding node, requeue JobId=%u", job_id);
			job_complete(job_id, 0, true, 0);
		}
		unlock_slurmctld(node_write_lock);
		fatal("Code development needed here if agent is not thread");
	if (retry_cnt && agent_ptr->retry)
		_queue_agent_retry(agent_ptr, retry_cnt);

	/* Update last_response on responding nodes */
	lock_slurmctld(node_write_lock);
	for (i = 0; i < agent_ptr->thread_count; i++) {
		if (thread_ptr[i].state == DSH_FAILED)
			set_node_down(thread_ptr[i].node_name, 
			              "Prolog/epilog failure");
		if (thread_ptr[i].state == DSH_DONE)
			node_did_resp(thread_ptr[i].node_name);
	unlock_slurmctld(node_write_lock);

	if (run_scheduler) {
		run_scheduler = false;
		schedule();	/* has own locks */
	}
	if ((agent_ptr->msg_type == REQUEST_PING) ||
	    (agent_ptr->msg_type == REQUEST_NODE_REGISTRATION_STATUS))
	fatal("Code development needed here if agent is not thread");
/* Report a communications error for specified node */
static inline void _comm_err(char *node_name)
{
#if AGENT_IS_THREAD
	if (is_node_resp (node_name))
#endif
		error("agent/send_recv_msg: %s: %m", node_name);
}

 * _thread_per_node_rpc - thread to issue an RPC on a collection of nodes
 * IN/OUT args - pointer to task_info_t, xfree'd on completion
 */
static void *_thread_per_node_rpc(void *args)
	int rc = SLURM_SUCCESS, timeout = 0;
	task_info_t *task_ptr = (task_info_t *) args;
	thd_t *thread_ptr = task_ptr->thread_struct_ptr;
	state_t thread_state = DSH_NO_RESP;
	slurm_msg_type_t msg_type = task_ptr->msg_type;
	/* Locks: Write job, write node */
	slurmctld_lock_t job_write_lock = { 
		NO_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK };
	/* Locks: Read job */
	slurmctld_lock_t job_read_lock = {
		NO_LOCK, READ_LOCK, NO_LOCK, NO_LOCK };
	is_kill_msg = (	(msg_type == REQUEST_KILL_TIMELIMIT) ||
			(msg_type == REQUEST_KILL_JOB)     );
	srun_agent = (	(msg_type == SRUN_PING)    ||
			(msg_type == SRUN_TIMEOUT) ||
			(msg_type == RESPONSE_RESOURCE_ALLOCATION) ||
	thread_ptr->start_time = time(NULL);

	/* don't try to communicate with defunct job */
#if AGENT_IS_THREAD
	if (srun_agent) {
		uint32_t          job_id   = 0;
		enum job_states    state   = JOB_END;
		struct job_record *job_ptr = NULL;

		if (msg_type == SRUN_PING) {
			srun_ping_msg_t *msg = task_ptr->msg_args_ptr;
			job_id  = msg->job_id;
		} else if (msg_type == SRUN_TIMEOUT) {
			srun_timeout_msg_t *msg = task_ptr->msg_args_ptr;
			job_id  = msg->job_id;
		} else if (msg_type == SRUN_NODE_FAIL) {
			srun_node_fail_msg_t *msg = task_ptr->msg_args_ptr;
			job_id  = msg->job_id;
		} else if (msg_type == RESPONSE_RESOURCE_ALLOCATION) {
			resource_allocation_response_msg_t *msg = 
				task_ptr->msg_args_ptr;
			job_id  = msg->job_id;
		}
		lock_slurmctld(job_read_lock);
		if (job_id)
			job_ptr = find_job_record(job_id);
		if (job_ptr)
			state = job_ptr->job_state;	
		unlock_slurmctld(job_read_lock);
		if ((state == JOB_RUNNING) ||
		    ((state & JOB_COMPLETING) && (msg_type == SRUN_NODE_FAIL))) {
			; /* proceed with the communication */
		} else {	
			thread_state = DSH_DONE;
			goto cleanup;
		}
	}
#endif

	slurm_mutex_lock(task_ptr->thread_mutex_ptr);
	thread_ptr->state = DSH_ACTIVE;
	slurm_mutex_unlock(task_ptr->thread_mutex_ptr);

	thread_ptr->end_time = thread_ptr->start_time + COMMAND_TIMEOUT;
		if (slurm_send_recv_rc_msg(&msg, &rc, timeout) < 0) {
			if (!srun_agent)
				_comm_err(thread_ptr->node_name);
		if (slurm_send_only_node_msg(&msg) < 0) {
			if (!srun_agent)
				_comm_err(thread_ptr->node_name);
		} else
#if AGENT_IS_THREAD
	/* SPECIAL CASE: Mark node as IDLE if job already complete */
	if (is_kill_msg && (rc == ESLURMD_KILL_JOB_ALREADY_COMPLETE)) {
		kill_job_msg_t *kill_job;
		kill_job = (kill_job_msg_t *) task_ptr->msg_args_ptr;
		rc = SLURM_SUCCESS;
		lock_slurmctld(job_write_lock);
		if (job_epilog_complete(kill_job->job_id, 
		                        thread_ptr->node_name, rc))
			run_scheduler = true;
		unlock_slurmctld(job_write_lock);
	}

	/* SPECIAL CASE: Kill non-startable batch job */
	if ((msg_type == REQUEST_BATCH_JOB_LAUNCH) && rc) {
		batch_job_launch_msg_t *launch_msg_ptr = task_ptr->msg_args_ptr;
		uint32_t job_id = launch_msg_ptr->job_id;
		info("Killing non-startable batch job %u: %s", 
			job_id, slurm_strerror(rc));
		thread_state = DSH_DONE;
		lock_slurmctld(job_write_lock);
		job_complete(job_id, 0, false, 1);
		unlock_slurmctld(job_write_lock);
		goto cleanup;
	}
/*		debug3("agent processed RPC to node %s", 
			thread_ptr->node_name); */
	case ESLURMD_EPILOG_FAILED:
		error("Epilog failure on host %s, setting DOWN", 
		      thread_ptr->node_name);
		thread_state = DSH_FAILED;
	case ESLURMD_PROLOG_FAILED:
		error("Prolog failure on host %s, setting DOWN",
		      thread_ptr->node_name);
		thread_state = DSH_FAILED;
		break;
	case ESLURM_INVALID_JOB_ID:  /* Not indicative of a real error */
	case ESLURMD_JOB_NOTRUNNING: /* Not indicative of a real error */
		debug2("agent processed RPC to node %s: %s",
		       thread_ptr->node_name, slurm_strerror(rc));
		error("agent error from host %s for msg type %d: %s", 
		      thread_ptr->node_name, task_ptr->msg_type, 
		      slurm_strerror(rc));
      cleanup:
	slurm_mutex_lock(task_ptr->thread_mutex_ptr);
	thread_ptr->state = thread_state;
	thread_ptr->end_time = (time_t) difftime(time(NULL), 
						thread_ptr->start_time);

	/* Signal completion so another thread can replace us */
	(*task_ptr->threads_active_ptr)--;
	slurm_mutex_unlock(task_ptr->thread_mutex_ptr);
	pthread_cond_signal(task_ptr->thread_cond_ptr);
	xfree(args);
	return (void *) NULL;
 * SIGALRM handler.  We are really interested in interrupting hung communictions
 * and causing them to return EINTR. Multiple interupts might be required.
Moe Jette's avatar
Moe Jette committed
static void _alarm_handler(int dummy)
	xsignal(SIGALRM, _alarm_handler);
/*
 * _queue_agent_retry - Queue any failed RPCs for later replay
 * IN agent_info_ptr - pointer to info on completed agent requests
 * IN count - number of agent requests which failed, count to requeue
 */
static void _queue_agent_retry(agent_info_t * agent_info_ptr, int count)
{
	agent_arg_t *agent_arg_ptr;
	queued_request_t *queued_req_ptr = NULL;
	thd_t *thread_ptr = agent_info_ptr->thread_struct;
	int i, j;

	if (count == 0)
		return;

	/* build agent argument with just the RPCs to retry */
	agent_arg_ptr = xmalloc(sizeof(agent_arg_t));
	agent_arg_ptr->node_count = count;
	agent_arg_ptr->retry = 1;
	agent_arg_ptr->slurm_addr = xmalloc(sizeof(struct sockaddr_in)
					    * count);
	agent_arg_ptr->node_names = xmalloc(MAX_NAME_LEN * count);
	agent_arg_ptr->msg_type = agent_info_ptr->msg_type;
	agent_arg_ptr->msg_args = *(agent_info_ptr->msg_args_pptr);
	*(agent_info_ptr->msg_args_pptr) = NULL;
	for (i = 0; i < agent_info_ptr->thread_count; i++) {
		if (thread_ptr[i].state != DSH_NO_RESP)
			continue;
		agent_arg_ptr->slurm_addr[j] = thread_ptr[i].slurm_addr;
		strncpy(&agent_arg_ptr->node_names[j * MAX_NAME_LEN],
			thread_ptr[i].node_name, MAX_NAME_LEN);
		if ((++j) == count)
			break;
	}
	if (count != j) {
		error("agent: Retry count (%d) != actual count (%d)", 
			count, j);
		agent_arg_ptr->node_count = j;
	}
	debug2("Queue RPC msg_type=%u, nodes=%d for retry", 
	       agent_arg_ptr->msg_type, j);

	/* add the requeust to a list */
	queued_req_ptr = xmalloc(sizeof(queued_request_t));
	queued_req_ptr->agent_arg_ptr = agent_arg_ptr;
	queued_req_ptr->last_attempt  = time(NULL);
	slurm_mutex_lock(&retry_mutex);
	if (retry_list == NULL) {
		retry_list = list_create(&_list_delete_retry);
		if (retry_list == NULL)
			fatal("list_create failed");
	if (list_append(retry_list, (void *) queued_req_ptr) == 0)
		fatal("list_append failed");
	slurm_mutex_unlock(&retry_mutex);
Moe Jette's avatar
Moe Jette committed
 * _list_delete_retry - delete an entry from the retry list, 
 *	see common/list.h for documentation
 */
static void _list_delete_retry(void *retry_entry)
	queued_request_t *queued_req_ptr;
	if (! retry_entry)
		return;

	queued_req_ptr = (queued_request_t *) retry_entry;
	_purge_agent_args(queued_req_ptr->agent_arg_ptr);
	xfree(queued_req_ptr);
 * agent_retry - Agent for retrying pending RPCs. One pending request is 
 *	issued if it has been pending for at least min_wait seconds
 * IN min_wait - Minimum wait time between re-issue of a pending RPC
 * RET count of queued requests remaining (zero if none are old)
extern int agent_retry (int min_wait)
	time_t now = time(NULL);
	queued_request_t *queued_req_ptr = NULL;
	slurm_mutex_lock(&retry_mutex);
		double age = 0;
		queued_req_ptr = (queued_request_t *) list_peek(retry_list);
		if (queued_req_ptr) {
			age = difftime(now, queued_req_ptr->last_attempt);
				queued_req_ptr = (queued_request_t *) 
					list_pop(retry_list);
				list_size = list_count(retry_list);;
			} else /* too new */
	slurm_mutex_unlock(&retry_mutex);
	if (queued_req_ptr) {
		agent_arg_t *agent_arg_ptr = queued_req_ptr->agent_arg_ptr;
		xfree(queued_req_ptr);
		if (agent_arg_ptr)
			_spawn_retry_agent(agent_arg_ptr);
		else
			error("agent_retry found record with no agent_args");
	}
 * agent_queue_request - put a new request on the queue for later execution
 * IN agent_arg_ptr - the request to enqueue
 */
void agent_queue_request(agent_arg_t *agent_arg_ptr)
{
	queued_request_t *queued_req_ptr = NULL;

	queued_req_ptr = xmalloc(sizeof(queued_request_t));
	queued_req_ptr->agent_arg_ptr = agent_arg_ptr;
/*	queued_req_ptr->last_attempt  = 0; Implicit */

	slurm_mutex_lock(&retry_mutex);
	if (retry_list == NULL) {
		retry_list = list_create(&_list_delete_retry);
		if (retry_list == NULL)
			fatal("list_create failed");
	}
	list_prepend(retry_list, (void *)queued_req_ptr);
/* _spawn_retry_agent - pthread_create an agent for the given task */
static void _spawn_retry_agent(agent_arg_t * agent_arg_ptr)
	pthread_attr_t attr_agent;
	pthread_t thread_agent;

	if (agent_arg_ptr == NULL)
		return;

	debug2("Spawning RPC agent for msg_type %u", 
	if (pthread_attr_init(&attr_agent))
		fatal("pthread_attr_init error %m");
	if (pthread_attr_setdetachstate(&attr_agent,
					PTHREAD_CREATE_DETACHED))
		error("pthread_attr_setdetachstate error %m");
#ifdef PTHREAD_SCOPE_SYSTEM
	if (pthread_attr_setscope(&attr_agent, PTHREAD_SCOPE_SYSTEM))
		error("pthread_attr_setscope error %m");
	while (pthread_create(&thread_agent, &attr_agent,
			agent, (void *) agent_arg_ptr)) {
		error("pthread_create error %m");
		if (++retries > MAX_RETRIES)
			fatal("Can't create pthread");
		sleep(1);	/* sleep and try again */
Moe Jette's avatar
Moe Jette committed
/* _slurmctld_free_job_launch_msg is a variant of slurm_free_job_launch_msg
 *	because all environment variables currently loaded in one xmalloc 
 *	buffer (see get_job_env()), which is different from how slurmd 
 *	assembles the data from a message
 */
static void _slurmctld_free_job_launch_msg(batch_job_launch_msg_t * msg)
{
	if (msg) {
		if (msg->environment) {
			xfree(msg->environment[0]);
			xfree(msg->environment);
		slurm_free_job_launch_msg(msg);
/* agent_purge - purge all pending RPC requests */
		return;

	slurm_mutex_lock(&retry_mutex);
	list_destroy(retry_list);
	retry_list = NULL;
	slurm_mutex_unlock(&retry_mutex);
}

static void _purge_agent_args(agent_arg_t *agent_arg_ptr)
{
	if (agent_arg_ptr == NULL)
		return;

	xfree(agent_arg_ptr->slurm_addr);
	xfree(agent_arg_ptr->node_names);
	if (agent_arg_ptr->msg_args) {
		if (agent_arg_ptr->msg_type == REQUEST_BATCH_JOB_LAUNCH)
			_slurmctld_free_job_launch_msg(agent_arg_ptr->msg_args);
		else if (agent_arg_ptr->msg_type == 
				RESPONSE_RESOURCE_ALLOCATION)
			slurm_free_resource_allocation_response_msg(
					agent_arg_ptr->msg_args);
		else
			xfree(agent_arg_ptr->msg_args);