Skip to content
Snippets Groups Projects
agent.c 14.5 KiB
Newer Older
/*****************************************************************************\
 *  agent.c - parallel background communication functions. This is where  
 *	logic could be placed for broadcast communications.
 *****************************************************************************
 *  Copyright (C) 2002 The Regents of the University of California.
 *  Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
 *  Written by Moe Jette <jette@llnl.gov>, et. al.
 *  Derived from pdsh written by Jim Garlick <garlick1@llnl.gov>
 *  UCRL-CODE-2002-040.
 *  
 *  This file is part of SLURM, a resource management program.
 *  For details, see <http://www.llnl.gov/linux/slurm/>.
 *  
 *  SLURM is free software; you can redistribute it and/or modify it under
 *  the terms of the GNU General Public License as published by the Free
 *  Software Foundation; either version 2 of the License, or (at your option)
 *  any later version.
 *  
 *  SLURM is distributed in the hope that it will be useful, but WITHOUT ANY
 *  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
 *  FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
 *  details.
 *  
 *  You should have received a copy of the GNU General Public License along
 *  with SLURM; if not, write to the Free Software Foundation, Inc.,
 *  59 Temple Place, Suite 330, Boston, MA  02111-1307  USA.
 *****************************************************************************
 *  Theory of operation:
 *
 *  The functions below permit slurm to initiate parallel tasks as a 
 *  detached thread and let the functions below make sure the work happens. 
 *  For example, when a job step completes slurmctld needs to revoke credentials 
 *  for that job step on every node to which it was allocated. We don't want to 
 *  hang slurmctld's primary function (the job complete RPC) to perform this 
 *  work, so it just initiates an agent to perform the work. The agent is passed 
 *  all details required to perform the work, so it will be possible to execute
 *  the agent as an pthread, process, or even a daemon on some other computer.
 *  The main agent thread creates a separate thread for each node to be
 *  communicated with up to AGENT_THREAD_COUNT. A special watchdog thread sends 
 *  SIGLARM to any threads that have been active (in DSH_ACTIVE state) for more 
 *  than COMMAND_TIMEOUT seconds. 
 *  The agent responds to slurmctld via an RPC as required.
 *  For example, informing slurmctld that some node is not responding.
 *  All the state for each thread is maintailed in thd_t struct, which is 
 *  used by the watchdog thread as well as the communication threads.
\*****************************************************************************/

#ifdef HAVE_CONFIG_H
#  include <config.h>
#endif

#include <errno.h>
#include <pthread.h>
#include <signal.h>
#include <string.h>
#include <src/common/log.h>
#include <src/common/slurm_protocol_defs.h>
#include <src/common/xmalloc.h>
#include <src/common/xstring.h>
#include <src/slurmctld/agent.h>
#include <src/slurmctld/locks.h>
#if COMMAND_TIMEOUT == 1
#define WDOG_POLL 		1	/* secs */
#else
#define WDOG_POLL 		2	/* secs */
#endif

typedef enum {DSH_NEW, DSH_ACTIVE, DSH_DONE, DSH_FAILED} state_t;

typedef struct thd {
        pthread_t	thread;			/* thread ID */
        pthread_attr_t	attr;			/* thread attributes */
        state_t		state;      		/* thread state */
        time_t 		time;   		/* time stamp for start or delta time */
	struct sockaddr_in slurm_addr;		/* network address */
	char		node_name[MAX_NAME_LEN];/* node's name */
} thd_t;

typedef struct agent_info {
	pthread_mutex_t	thread_mutex;		/* agent specific mutex */
	pthread_cond_t	thread_cond;		/* agent specific condition */
	uint32_t	thread_count;		/* number of threads records */
	uint32_t	threads_active;		/* count of currently active threads */
	thd_t 		*thread_struct;		/* thread structures */
	slurm_msg_type_t msg_type;		/* RPC to be issued */
	void		*msg_args;		/* RPC data to be used */
} agent_info_t;

typedef struct task_info {
	pthread_mutex_t	*thread_mutex_ptr;	/* pointer to agent specific mutex */
	pthread_cond_t	*thread_cond_ptr;	/* pointer to agent specific condition */
	uint32_t	*threads_active_ptr;	/* pointer to count of currently active threads */
	thd_t 		*thread_struct_ptr;	/* pointer to thread structures */
	slurm_msg_type_t msg_type;		/* RPC to be issued */
	void		*msg_args_ptr;		/* pointer to RPC data to be used */
} task_info_t;
static void *thread_per_node_rpc (void *args);
static void *wdog (void *args);
 * agent - party responsible for transmitting an common RPC in parallel across a set 
 *	of nodes
 * input: pointer to agent_arg_t, which is xfree'd (including slurm_addr, node_names, 
 *	and msg_args) upon completion if AGENT_IS_THREAD is set
agent (void *args)
{
	int i, rc;
	pthread_attr_t attr_wdog;
	pthread_t thread_wdog;
	agent_arg_t *agent_arg_ptr = args;
	agent_info_t *agent_info_ptr = NULL;
	thd_t *thread_ptr;
	task_info_t *task_specific_ptr;
	/* basic argument value tests */
	if (agent_arg_ptr == NULL)
		fatal ("agent NULL argument");
	if (agent_arg_ptr->addr_count == 0)
		goto cleanup;	/* no messages to be sent */
	if (agent_arg_ptr->slurm_addr == NULL)
		fatal ("agent passed NULL address list");
	if (agent_arg_ptr->node_names == NULL)
		fatal ("agent passed NULL node name list");
	if (agent_arg_ptr->msg_type != REQUEST_REVOKE_JOB_CREDENTIAL)
		fatal ("agent passed invalid message type %d", agent_arg_ptr->msg_type);

	/* initialize the data structures */
	agent_info_ptr = xmalloc (sizeof (agent_info_t));
	if (pthread_mutex_init (&agent_info_ptr->thread_mutex, NULL))
		fatal (" pthread_mutex_init error %m");
	if (pthread_cond_init (&agent_info_ptr->thread_cond, NULL))
		fatal ("pthread_cond_init error %m");
	agent_info_ptr->thread_count = agent_arg_ptr->addr_count;
	agent_info_ptr->threads_active = 0;
	thread_ptr = xmalloc (agent_arg_ptr->addr_count * sizeof (thd_t));
	agent_info_ptr->thread_struct = thread_ptr;
	agent_info_ptr->msg_type = agent_arg_ptr->msg_type;
	agent_info_ptr->msg_args = agent_arg_ptr->msg_args;
	for (i = 0; i < agent_info_ptr->thread_count; i++) {
		thread_ptr[i].state = DSH_NEW;
		thread_ptr[i].slurm_addr = agent_arg_ptr->slurm_addr[i];
		strncpy (thread_ptr[i].node_name,
		         &agent_arg_ptr->node_names[i*MAX_NAME_LEN], MAX_NAME_LEN);

	/* start the watchdog thread */
	if (pthread_attr_init (&attr_wdog))
		fatal ("pthread_attr_init error %m");
	if (pthread_attr_setdetachstate (&attr_wdog, PTHREAD_CREATE_JOINABLE))
		error ("pthread_attr_setdetachstate error %m");
#ifdef PTHREAD_SCOPE_SYSTEM
	if (pthread_attr_setscope (&attr_wdog, PTHREAD_SCOPE_SYSTEM))
		error ("pthread_attr_setscope error %m");
	if (pthread_create (&thread_wdog, &attr_wdog, wdog, (void *)agent_info_ptr)) {
		error ("pthread_create error %m");
		sleep (1); /* sleep and try once more */
		if (pthread_create (&thread_wdog, &attr_wdog, wdog, args))
			fatal ("pthread_create error %m");
	/* start all the other threads (up to AGENT_THREAD_COUNT active at once) */
	for (i = 0; i < agent_info_ptr->thread_count; i++) {
		
		/* wait until "room" for another thread */	
		pthread_mutex_lock (&agent_info_ptr->thread_mutex);
#if 		AGENT_THREAD_COUNT < 1
		fatal ("AGENT_THREAD_COUNT value is invalid");
#endif
     		while (agent_info_ptr->threads_active >= AGENT_THREAD_COUNT) {
			pthread_cond_wait (&agent_info_ptr->thread_cond, 
			                   &agent_info_ptr->thread_mutex);
		}
		/* create thread, note this is freed from  thread_per_node_rpc() */
		task_specific_ptr 			= xmalloc (sizeof (task_info_t));
		task_specific_ptr->thread_mutex_ptr	= &agent_info_ptr->thread_mutex;
		task_specific_ptr->thread_cond_ptr	= &agent_info_ptr->thread_cond;
		task_specific_ptr->threads_active_ptr	= &agent_info_ptr->threads_active;
		task_specific_ptr->thread_struct_ptr	= &thread_ptr[i];
		task_specific_ptr->msg_type		= agent_info_ptr->msg_type;
		task_specific_ptr->msg_args_ptr		= &agent_info_ptr->msg_args;
		if (pthread_attr_init (&thread_ptr[i].attr))
			fatal ("pthread_attr_init error %m");
		if (pthread_attr_setdetachstate (&thread_ptr[i].attr, PTHREAD_CREATE_DETACHED))
			error ("pthread_attr_setdetachstate error %m");
		if (pthread_attr_setscope (&thread_ptr[i].attr, PTHREAD_SCOPE_SYSTEM))
			error ("pthread_attr_setscope error %m");
		while ( (rc = pthread_create (&thread_ptr[i].thread, 
		                              &thread_ptr[i].attr, 
		                              thread_per_node_rpc, 
		                              (void *) task_specific_ptr)) ) {
			error ("pthread_create error %m");
			if (agent_info_ptr->threads_active)
				pthread_cond_wait (&agent_info_ptr->thread_cond, 
				                   &agent_info_ptr->thread_mutex);
			else {
				pthread_mutex_unlock (&agent_info_ptr->thread_mutex);
				sleep (1);
				pthread_mutex_lock (&agent_info_ptr->thread_mutex);

		agent_info_ptr->threads_active++;
		pthread_mutex_unlock (&agent_info_ptr->thread_mutex);
        }

	/* wait for termination of remaining threads */
	pthread_join (thread_wdog, NULL);

cleanup:
#if AGENT_IS_THREAD
	if (agent_arg_ptr) {
		if (agent_arg_ptr->slurm_addr)
			xfree (agent_arg_ptr->slurm_addr);
		if (agent_arg_ptr->node_names)
			xfree (agent_arg_ptr->node_names);
		if (agent_arg_ptr->msg_args)
			xfree (agent_arg_ptr->msg_args);
		xfree (agent_arg_ptr);
	}
	if (agent_info_ptr) {
		if (agent_info_ptr->thread_struct)
			xfree (agent_info_ptr->thread_struct);
		xfree (agent_info_ptr);
	}
/* 
 * wdog - Watchdog thread. Send SIGALRM to threads which have been active for too long.
 *	Sleep for WDOG_POLL seconds between polls.
 */
static void *
wdog (void *args)
{
	int i, fail_cnt, work_done, delay, max_delay = 0;
	agent_info_t *agent_ptr = (agent_info_t *) args;
	thd_t *thread_ptr = agent_ptr->thread_struct;
#if AGENT_IS_THREAD
	/* Locks: Write node */
	slurmctld_lock_t node_write_lock = { NO_LOCK, NO_LOCK, WRITE_LOCK, NO_LOCK };
#else
	struct sockaddr_in *slurm_addr;
#endif
		work_done = 1;	/* assume all threads complete for now */
		fail_cnt = 0;	/* assume all threads complete sucessfully for now */
		sleep (WDOG_POLL);

		pthread_mutex_lock (&agent_ptr->thread_mutex);
		for (i = 0; i < agent_ptr->thread_count; i++) {
			switch (thread_ptr[i].state) {
				case DSH_ACTIVE:
					delay = difftime (time (NULL), thread_ptr[i].time);
					if ( delay >= COMMAND_TIMEOUT)
						pthread_kill(thread_ptr[i].thread, SIGALRM);
					break;
				case DSH_NEW:
					break;
				case DSH_DONE:
					if ( max_delay < (int) thread_ptr[i].time )
						max_delay = (int) thread_ptr[i].time;
				case DSH_FAILED:
		pthread_mutex_unlock (&agent_ptr->thread_mutex);

	/* Notify slurmctld of non-responding nodes */
	if (fail_cnt) {
#if AGENT_IS_THREAD
		/* Update node table data for non-responding nodes */
		lock_slurmctld (node_write_lock);
		for (i = 0; i < agent_ptr->thread_count; i++) {
			if (thread_ptr[i].state == DSH_FAILED)
				node_not_resp (thread_ptr[i].node_name);
		}
		unlock_slurmctld (node_write_lock);
#else
		/* Build a list of all non-responding nodes and send it to slurmctld */
		error ("agent/wdog: %d nodes failed to respond", fail_cnt);
		slurm_addr = xmalloc (fail_cnt * sizeof (struct sockaddr_in));
		for (i = 0; i < agent_ptr->thread_count; i++) {
			if (thread_ptr[i].state == DSH_FAILED)
				slurm_addr[fail_cnt++] = thread_ptr[i].slurm_addr;
		fatal ("Code development needed here if agent is not thread");
	if (max_delay)
		debug ("agent maximum delay %d seconds", max_delay);
	pthread_mutex_unlock (&agent_ptr->thread_mutex);
	return (void *) NULL;
/* thread_per_node_rpc - thread to revoke a credential on a collection of nodes
 *	This xfrees the argument passed to it */
thread_per_node_rpc (void *args)
{
	sigset_t set;
	int msg_size ;
	int rc ;
	slurm_fd sockfd ;
	slurm_msg_t request_msg ;
	slurm_msg_t response_msg ;
	return_code_msg_t * slurm_rc_msg ;
	task_info_t *task_ptr = (task_info_t *) args;
	thd_t *thread_ptr = task_ptr->thread_struct_ptr;
	state_t thread_state  = DSH_FAILED;
	/* accept SIGALRM */
	if (sigemptyset (&set))
		error ("sigemptyset error: %m");
	if (sigaddset (&set, SIGALRM))
		error ("sigaddset error on SIGALRM: %m");
	if (sigprocmask (SIG_UNBLOCK, &set, NULL) != 0)
		error ("sigprocmask error: %m");

	if (args == NULL)
		fatal ("thread_per_node_rpc has NULL argument");
	pthread_mutex_lock (task_ptr->thread_mutex_ptr);
	thread_ptr->state = DSH_ACTIVE;
	thread_ptr->time = time (NULL);
	pthread_mutex_unlock (task_ptr->thread_mutex_ptr);
	/* init message connection for message communication */
	if ( ( sockfd = slurm_open_msg_conn (& thread_ptr->slurm_addr) ) == SLURM_SOCKET_ERROR ) {
		error ("thread_per_node_rpc/slurm_open_msg_conn error %m");
	request_msg . msg_type = task_ptr->msg_type ;
	request_msg . data = task_ptr->msg_args_ptr ; 
	if ( ( rc = slurm_send_node_msg ( sockfd , & request_msg ) ) == SLURM_SOCKET_ERROR ) {
		error ("thread_per_node_rpc/slurm_send_node_msg error %m");
		goto cleanup;
	}

	/* receive message */
	if ( ( msg_size = slurm_receive_msg ( sockfd , & response_msg ) ) == SLURM_SOCKET_ERROR ) {
		error ("thread_per_node_rpc/slurm_receive_msg error %m");
		goto cleanup;
	}

	/* shutdown message connection */
	if ( ( rc = slurm_shutdown_msg_conn ( sockfd ) ) == SLURM_SOCKET_ERROR ) {
		error ("thread_per_node_rpc/slurm_shutdown_msg_conn error %m");
		error ("thread_per_node_rpc/msg_size error %d", msg_size);
		goto cleanup;
	}

	switch ( response_msg . msg_type )
	{
		case RESPONSE_SLURM_RC:
			slurm_rc_msg = ( return_code_msg_t * ) response_msg . data ;
			rc = slurm_rc_msg->return_code;
			slurm_free_return_code_msg ( slurm_rc_msg );	
			if (rc)
				error ("thread_per_node_rpc/rc error %d", rc);
			else {
				debug3 ("agent sucessfully processed RPC to node %s", 
				        thread_ptr->node_name);
				thread_state = DSH_DONE;
			error ("thread_per_node_rpc bad msg_type %d",response_msg.msg_type);
	pthread_mutex_lock (task_ptr->thread_mutex_ptr);
	thread_ptr->state = thread_state;
	thread_ptr->time = (time_t) difftime (time (NULL), thread_ptr->time);

	/* Signal completion so another thread can replace us */
	(*task_ptr->threads_active_ptr)--;
	pthread_cond_signal(task_ptr->thread_cond_ptr);
	pthread_mutex_unlock (task_ptr->thread_mutex_ptr);
	xfree (args);
	return (void *) NULL;