Newer
Older
/*****************************************************************************\
* agent.c - parallel background communication functions. This is where
* logic could be placed for broadcast communications.
*****************************************************************************
* Copyright (C) 2002 The Regents of the University of California.
* Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
* Written by Moe Jette <jette@llnl.gov>, et. al.
* Derived from pdsh written by Jim Garlick <garlick1@llnl.gov>
* UCRL-CODE-2002-040.
*
* This file is part of SLURM, a resource management program.
* For details, see <http://www.llnl.gov/linux/slurm/>.
*
* SLURM is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the License, or (at your option)
* any later version.
*
* SLURM is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along
* with SLURM; if not, write to the Free Software Foundation, Inc.,
* 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
*****************************************************************************
* Theory of operation:
*
* The functions below permit slurm to initiate parallel tasks as a
* detached thread and let the functions below make sure the work happens.
* For example, when a job step completes slurmctld needs to revoke credentials
* for that job step on every node to which it was allocated. We don't want to
* hang slurmctld's primary function (the job complete RPC) to perform this
* work, so it just initiates an agent to perform the work. The agent is passed
* all details required to perform the work, so it will be possible to execute
* the agent as an pthread, process, or even a daemon on some other computer.
* The main agent thread creates a separate thread for each node to be
* communicated with up to AGENT_THREAD_COUNT. A special watchdog thread sends
* SIGLARM to any threads that have been active (in DSH_ACTIVE state) for more
* than COMMAND_TIMEOUT seconds.
* The agent responds to slurmctld via an RPC as required.
* For example, informing slurmctld that some node is not responding.
* All the state for each thread is maintailed in thd_t struct, which is
* used by the watchdog thread as well as the communication threads.
\*****************************************************************************/
#ifdef HAVE_CONFIG_H
# include <config.h>
#endif
#include <errno.h>
#include <pthread.h>
#include <signal.h>
#include <unistd.h>
#include <src/common/log.h>
#include <src/common/slurm_protocol_defs.h>
#include <src/common/xmalloc.h>
#include <src/common/xstring.h>
#include <src/slurmctld/agent.h>
#if COMMAND_TIMEOUT == 1
#define WDOG_POLL 1 /* secs */
#else
#define WDOG_POLL 2 /* secs */
#endif
typedef enum {DSH_NEW, DSH_ACTIVE, DSH_DONE, DSH_FAILED} state_t;
typedef struct thd {
pthread_t thread; /* thread ID */
pthread_attr_t attr; /* thread attributes */
state_t state; /* thread state */
time_t time; /* time stamp for start or delta time */
struct sockaddr_in slurm_addr; /* network address */
char node_name[MAX_NAME_LEN];/* node's name */
} thd_t;
typedef struct agent_info {
pthread_mutex_t thread_mutex; /* agent specific mutex */
pthread_cond_t thread_cond; /* agent specific condition */
uint32_t thread_count; /* number of threads records */
uint32_t threads_active; /* count of currently active threads */
uint16_t retry; /* if set, keep trying */
thd_t *thread_struct; /* thread structures */
slurm_msg_type_t msg_type; /* RPC to be issued */
void **msg_args_pptr; /* RPC data to be used */
pthread_mutex_t *thread_mutex_ptr; /* pointer to agent specific mutex */
pthread_cond_t *thread_cond_ptr; /* pointer to agent specific condition */
uint32_t *threads_active_ptr; /* pointer to count of currently active threads */
thd_t *thread_struct_ptr; /* pointer to thread structures */
slurm_msg_type_t msg_type; /* RPC to be issued */
void *msg_args_ptr; /* pointer to RPC data to be used */
static void alarm_handler(int dummy);
static void list_delete_retry (void *retry_entry);
static void queue_agent_retry (agent_info_t *agent_info_ptr, int count);
static void slurmctld_free_job_launch_msg(batch_job_launch_msg_t * msg);
static void spawn_retry_agent (agent_arg_t *agent_arg_ptr);
static void *thread_per_node_rpc (void *args);
static void *wdog (void *args);
static void xsignal(int signal, void (*handler)(int));
/* retry RPC data structures */
pthread_mutex_t retry_mutex = PTHREAD_MUTEX_INITIALIZER;
List retry_list = NULL; /* agent_arg_t list for retry */
* agent - party responsible for transmitting an common RPC in parallel across a set
* of nodes
* input: pointer to agent_arg_t, which is xfree'd (including slurm_addr, node_names,
* and msg_args) upon completion if AGENT_IS_THREAD is set
*/
void *
{
int i, rc;
pthread_attr_t attr_wdog;
pthread_t thread_wdog;
agent_arg_t *agent_arg_ptr = args;
agent_info_t *agent_info_ptr = NULL;
thd_t *thread_ptr;
task_info_t *task_specific_ptr;
/* basic argument value tests */
if (agent_arg_ptr == NULL)
fatal ("agent NULL argument");
if (agent_arg_ptr->node_count == 0)
goto cleanup; /* no messages to be sent */
if (agent_arg_ptr->slurm_addr == NULL)
fatal ("agent passed NULL address list");
if (agent_arg_ptr->node_names == NULL)
fatal ("agent passed NULL node name list");
if ((agent_arg_ptr->msg_type != REQUEST_REVOKE_JOB_CREDENTIAL) &&
(agent_arg_ptr->msg_type != REQUEST_NODE_REGISTRATION_STATUS) &&
(agent_arg_ptr->msg_type != REQUEST_PING) &&
(agent_arg_ptr->msg_type != REQUEST_BATCH_JOB_LAUNCH))
fatal ("agent passed invalid message type %d", agent_arg_ptr->msg_type);
/* initialize the data structures */
agent_info_ptr = xmalloc (sizeof (agent_info_t));
if (pthread_mutex_init (&agent_info_ptr->thread_mutex, NULL))
fatal (" pthread_mutex_init error %m");
if (pthread_cond_init (&agent_info_ptr->thread_cond, NULL))
fatal ("pthread_cond_init error %m");
agent_info_ptr->thread_count = agent_arg_ptr->node_count;
agent_info_ptr->retry = agent_arg_ptr->retry;
agent_info_ptr->threads_active = 0;
thread_ptr = xmalloc (agent_arg_ptr->node_count * sizeof (thd_t));
agent_info_ptr->thread_struct = thread_ptr;
agent_info_ptr->msg_type = agent_arg_ptr->msg_type;
agent_info_ptr->msg_args_pptr = &agent_arg_ptr->msg_args;
for (i = 0; i < agent_info_ptr->thread_count; i++) {
thread_ptr[i].slurm_addr = agent_arg_ptr->slurm_addr[i];
strncpy (thread_ptr[i].node_name,
&agent_arg_ptr->node_names[i*MAX_NAME_LEN], MAX_NAME_LEN);
/* start the watchdog thread */
if (pthread_attr_init (&attr_wdog))
fatal ("pthread_attr_init error %m");
if (pthread_attr_setdetachstate (&attr_wdog, PTHREAD_CREATE_JOINABLE))
error ("pthread_attr_setdetachstate error %m");
if (pthread_attr_setscope (&attr_wdog, PTHREAD_SCOPE_SYSTEM))
error ("pthread_attr_setscope error %m");
if (pthread_create (&thread_wdog, &attr_wdog, wdog, (void *)agent_info_ptr)) {
error ("pthread_create error %m");
sleep (1); /* sleep and try once more */
if (pthread_create (&thread_wdog, &attr_wdog, wdog, args))
fatal ("pthread_create error %m");
}
#if AGENT_THREAD_COUNT < 1
fatal ("AGENT_THREAD_COUNT value is invalid");
#endif
/* start all the other threads (up to AGENT_THREAD_COUNT active at once) */
for (i = 0; i < agent_info_ptr->thread_count; i++) {
/* wait until "room" for another thread */
pthread_mutex_lock (&agent_info_ptr->thread_mutex);
while (agent_info_ptr->threads_active >= AGENT_THREAD_COUNT) {
pthread_cond_wait (&agent_info_ptr->thread_cond,
&agent_info_ptr->thread_mutex);
}
/* create thread specific data, NOTE freed from thread_per_node_rpc() */
task_specific_ptr = xmalloc (sizeof (task_info_t));
task_specific_ptr->thread_mutex_ptr = &agent_info_ptr->thread_mutex;
task_specific_ptr->thread_cond_ptr = &agent_info_ptr->thread_cond;
task_specific_ptr->threads_active_ptr = &agent_info_ptr->threads_active;
task_specific_ptr->thread_struct_ptr = &thread_ptr[i];
task_specific_ptr->msg_type = agent_info_ptr->msg_type;
task_specific_ptr->msg_args_ptr = *agent_info_ptr->msg_args_pptr;
if (pthread_attr_init (&thread_ptr[i].attr))
fatal ("pthread_attr_init error %m");
if (pthread_attr_setdetachstate (&thread_ptr[i].attr, PTHREAD_CREATE_DETACHED))
error ("pthread_attr_setdetachstate error %m");
#ifdef PTHREAD_SCOPE_SYSTEM
if (pthread_attr_setscope (&thread_ptr[i].attr, PTHREAD_SCOPE_SYSTEM))
error ("pthread_attr_setscope error %m");
#endif
while ( (rc = pthread_create (&thread_ptr[i].thread,
&thread_ptr[i].attr,
thread_per_node_rpc,
(void *) task_specific_ptr)) ) {
error ("pthread_create error %m");
if (agent_info_ptr->threads_active)
pthread_cond_wait (&agent_info_ptr->thread_cond,
&agent_info_ptr->thread_mutex);
else {
pthread_mutex_unlock (&agent_info_ptr->thread_mutex);
sleep (1);
pthread_mutex_lock (&agent_info_ptr->thread_mutex);
agent_info_ptr->threads_active++;
pthread_mutex_unlock (&agent_info_ptr->thread_mutex);
}
/* wait for termination of remaining threads */
pthread_join (thread_wdog, NULL);
if (agent_arg_ptr) {
if (agent_arg_ptr->slurm_addr)
xfree (agent_arg_ptr->slurm_addr);
if (agent_arg_ptr->node_names)
xfree (agent_arg_ptr->node_names);
if (agent_arg_ptr->msg_args) {
if (agent_arg_ptr->msg_type == REQUEST_BATCH_JOB_LAUNCH)
slurmctld_free_job_launch_msg (agent_arg_ptr->msg_args);
else
xfree (agent_arg_ptr->msg_args);
}
xfree (agent_arg_ptr);
}
if (agent_info_ptr) {
if (agent_info_ptr->thread_struct)
xfree (agent_info_ptr->thread_struct);
xfree (agent_info_ptr);
}
/*
* wdog - Watchdog thread. Send SIGALRM to threads which have been active for too long.
* Sleep for WDOG_POLL seconds between polls.
*/
static void *
wdog (void *args)
{
int i, fail_cnt, work_done, delay, max_delay = 0;
agent_info_t *agent_ptr = (agent_info_t *) args;
thd_t *thread_ptr = agent_ptr->thread_struct;
/* Locks: Write job and write node */
slurmctld_lock_t node_write_lock = { NO_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK };
int done_cnt;
char *slurm_names;
work_done = 1; /* assume all threads complete for now */
fail_cnt = 0; /* assume all threads complete sucessfully for now */
sleep (WDOG_POLL);
pthread_mutex_lock (&agent_ptr->thread_mutex);
for (i = 0; i < agent_ptr->thread_count; i++) {
switch (thread_ptr[i].state) {
case DSH_ACTIVE:
delay = difftime (time (NULL), thread_ptr[i].time);
if ( delay >= COMMAND_TIMEOUT)
pthread_kill(thread_ptr[i].thread, SIGALRM);
break;
case DSH_NEW:
if ( max_delay < (int) thread_ptr[i].time )
max_delay = (int) thread_ptr[i].time;
if (work_done)
pthread_mutex_unlock (&agent_ptr->thread_mutex);
/* Notify slurmctld of non-responding nodes */
if (fail_cnt) {
#if AGENT_IS_THREAD
/* Update node table data for non-responding nodes */
lock_slurmctld (node_write_lock);
for (i = 0; i < agent_ptr->thread_count; i++) {
if (thread_ptr[i].state == DSH_FAILED)
node_not_resp (thread_ptr[i].node_name);
}
unlock_slurmctld (node_write_lock);
#else
/* Build a list of all non-responding nodes and send it to slurmctld */
slurm_names = xmalloc (fail_cnt * MAX_NAME_LEN);
fail_cnt = 0;
for (i = 0; i < agent_ptr->thread_count; i++) {
if (thread_ptr[i].state == DSH_FAILED) {
strncpy (&slurm_names[MAX_NAME_LEN * fail_cnt],
thread_ptr[i].node_name, MAX_NAME_LEN);
error ("agent/wdog: node %s failed to respond",
thread_ptr[i].node_name);
fail_cnt++;
}
fatal ("Code development needed here if agent is not thread");
xfree (slurm_names);
if (agent_ptr->retry)
queue_agent_retry (agent_ptr, fail_cnt);
#if AGENT_IS_THREAD
/* Update last_response on responding nodes */
lock_slurmctld (node_write_lock);
for (i = 0; i < agent_ptr->thread_count; i++) {
if (thread_ptr[i].state == DSH_DONE)
node_did_resp (thread_ptr[i].node_name);
}
unlock_slurmctld (node_write_lock);
#else
/* Build a list of all responding nodes and send it to slurmctld to update time stamps */
done_cnt = agent_ptr->thread_count - fail_cnt;
slurm_names = xmalloc (done_cnt * MAX_NAME_LEN);
done_cnt = 0;
for (i = 0; i < agent_ptr->thread_count; i++) {
if (thread_ptr[i].state == DSH_DONE) {
strncpy (&slurm_names[MAX_NAME_LEN * done_cnt],
thread_ptr[i].node_name, MAX_NAME_LEN);
done_cnt++;
}
}
/* send RPC */
fatal ("Code development needed here if agent is not thread");
xfree (slurm_addr);
#endif
if (max_delay)
debug ("agent maximum delay %d seconds", max_delay);
pthread_mutex_unlock (&agent_ptr->thread_mutex);
return (void *) NULL;
}
/* thread_per_node_rpc - thread to revoke a credential on a collection of nodes
* This xfrees the argument passed to it */
thread_per_node_rpc (void *args)
{
int msg_size ;
int rc ;
slurm_fd sockfd ;
slurm_msg_t request_msg ;
slurm_msg_t response_msg ;
return_code_msg_t * slurm_rc_msg ;
task_info_t *task_ptr = (task_info_t *) args;
thd_t *thread_ptr = task_ptr->thread_struct_ptr;
state_t thread_state = DSH_FAILED;
sigset_t set;
/* set up SIGALRM handler */
if (sigemptyset (&set))
error ("sigemptyset error: %m");
if (sigaddset (&set, SIGALRM))
error ("sigaddset error on SIGALRM: %m");
if (sigprocmask (SIG_UNBLOCK, &set, NULL) != 0)
fatal ("sigprocmask error: %m");
xsignal(SIGALRM, alarm_handler);
if (args == NULL)
fatal ("thread_per_node_rpc has NULL argument");
pthread_mutex_lock (task_ptr->thread_mutex_ptr);
thread_ptr->state = DSH_ACTIVE;
thread_ptr->time = time (NULL);
pthread_mutex_unlock (task_ptr->thread_mutex_ptr);
/* init message connection for message communication */
if ( ( sockfd = slurm_open_msg_conn (& thread_ptr->slurm_addr) ) == SLURM_SOCKET_ERROR ) {
error ("thread_per_node_rpc/slurm_open_msg_conn error %m");
goto cleanup;
}
/* send request message */
request_msg . msg_type = task_ptr->msg_type ;
request_msg . data = task_ptr->msg_args_ptr ;
if ( ( rc = slurm_send_node_msg ( sockfd , & request_msg ) ) == SLURM_SOCKET_ERROR ) {
error ("thread_per_node_rpc/slurm_send_node_msg error %m");
goto cleanup;
}
/* receive message */
if ( ( msg_size = slurm_receive_msg ( sockfd , & response_msg ) ) == SLURM_SOCKET_ERROR ) {
error ("thread_per_node_rpc/slurm_receive_msg error %m");
goto cleanup;
}
/* shutdown message connection */
if ( ( rc = slurm_shutdown_msg_conn ( sockfd ) ) == SLURM_SOCKET_ERROR ) {
error ("thread_per_node_rpc/slurm_shutdown_msg_conn error %m");
goto cleanup;
}
if ( msg_size ) {
error ("thread_per_node_rpc/msg_size error %d", msg_size);
goto cleanup;
}
switch ( response_msg . msg_type )
{
case RESPONSE_SLURM_RC:
slurm_rc_msg = ( return_code_msg_t * ) response_msg . data ;
rc = slurm_rc_msg->return_code;
slurm_free_return_code_msg ( slurm_rc_msg );
if (rc)
error ("thread_per_node_rpc/rc error %s",
slurm_strerror (rc));
else {
debug3 ("agent sucessfully processed RPC to node %s",
thread_ptr->node_name);
}
break ;
default:
error ("thread_per_node_rpc bad msg_type %d",response_msg.msg_type);
break ;
}
cleanup:
pthread_mutex_lock (task_ptr->thread_mutex_ptr);
thread_ptr->state = thread_state;
thread_ptr->time = (time_t) difftime (time (NULL), thread_ptr->time);
/* Signal completion so another thread can replace us */
(*task_ptr->threads_active_ptr)--;
pthread_cond_signal(task_ptr->thread_cond_ptr);
pthread_mutex_unlock (task_ptr->thread_mutex_ptr);
return (void *) NULL;
/*
* Emulate signal() but with BSD semantics (i.e. don't restore signal to
* SIGDFL prior to executing handler).
*/
static void xsignal(int signal, void (*handler)(int))
{
struct sigaction sa, old_sa;
sa.sa_handler = handler;
sigemptyset(&sa.sa_mask);
sigaddset(&sa.sa_mask, signal);
sa.sa_flags = 0;
sigaction(signal, &sa, &old_sa);
}
/*
* SIGALRM handler. This is just a stub because we are really interested
* in interrupting connect() in k4cmd/rcmd or select() in rsh() below and
* causing them to return EINTR.
*/
static void alarm_handler(int dummy)
{
}
/* queue_agent_retry - Queue any failed RPCs for later replay */
void
queue_agent_retry (agent_info_t *agent_info_ptr, int count)
{
agent_arg_t *agent_arg_ptr;
thd_t *thread_ptr = agent_info_ptr->thread_struct;
int i, j;
if (count == 0)
return;
/* build agent argument with just the RPCs to retry */
agent_arg_ptr = xmalloc (sizeof (agent_arg_t));
agent_arg_ptr -> node_count = count;
agent_arg_ptr -> retry = 1;
agent_arg_ptr -> slurm_addr = xmalloc (sizeof (struct sockaddr_in) * count);
agent_arg_ptr -> node_names = xmalloc (MAX_NAME_LEN * count);
agent_arg_ptr -> msg_type = agent_info_ptr -> msg_type;
agent_arg_ptr -> msg_args = *(agent_info_ptr -> msg_args_pptr);
*(agent_info_ptr -> msg_args_pptr) = NULL;
j = 0;
for (i=0; i<agent_info_ptr->thread_count; i++) {
if (thread_ptr[i].state != DSH_FAILED)
continue;
agent_arg_ptr->slurm_addr[j] = thread_ptr[i].slurm_addr;
strncpy(&agent_arg_ptr->node_names[j*MAX_NAME_LEN],
thread_ptr[i].node_name, MAX_NAME_LEN);
if ((++j) == count)
break;
}
/* add the requeust to a list */
pthread_mutex_lock (&retry_mutex);
if (retry_list == NULL) {
retry_list = list_create (&list_delete_retry);
if (retry_list == NULL)
fatal ("list_create failed");
}
if (list_enqueue (retry_list, (void *)agent_arg_ptr) == 0)
fatal ("list_append failed");
pthread_mutex_unlock (&retry_mutex);
}
/*
* list_delete_retry - delete an entry from the retry list,
* see common/list.h for documentation
*/
void
list_delete_retry (void *retry_entry)
{
agent_arg_t *agent_arg_ptr; /* pointer to part_record */
agent_arg_ptr = (agent_arg_t *) retry_entry;
if (agent_arg_ptr -> slurm_addr)
xfree (agent_arg_ptr -> slurm_addr);
if (agent_arg_ptr -> node_names)
xfree (agent_arg_ptr -> node_names);
#if AGENT_IS_THREAD
if (agent_arg_ptr -> msg_args)
xfree (agent_arg_ptr -> msg_args);
#endif
xfree (agent_arg_ptr);
}
/* agent_retry - Agent for retrying pending RPCs (top one on the queue),
* argument is unused */
void *
agent_retry (void *args)
{
agent_arg_t *agent_arg_ptr = NULL;
pthread_mutex_lock (&retry_mutex);
if (retry_list)
agent_arg_ptr = (agent_arg_t *) list_dequeue (retry_list);
pthread_mutex_unlock (&retry_mutex);
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
if (agent_arg_ptr)
spawn_retry_agent (agent_arg_ptr);
return NULL;
}
/* retry_pending - retry all pending RPCs for the given node name */
void
retry_pending (char *node_name)
{
int list_size = 0, i, j, found;
agent_arg_t *agent_arg_ptr = NULL;
pthread_mutex_lock (&retry_mutex);
if (retry_list) {
list_size = list_count (retry_list);
}
for (i = 0; i < list_size; i++) {
agent_arg_ptr = (agent_arg_t *) list_dequeue (retry_list);
found = 0;
for (j = 0; j < agent_arg_ptr->node_count; j++) {
if (strncmp (&agent_arg_ptr->node_names[j*MAX_NAME_LEN],
node_name, MAX_NAME_LEN))
continue;
found = 1;
break;
}
if (found) /* issue this RPC */
spawn_retry_agent (agent_arg_ptr);
else /* put the RPC back on the queue */
list_enqueue (retry_list, (void*) agent_arg_ptr);
}
pthread_mutex_unlock (&retry_mutex);
}
/* spawn_retry_agent - pthread_crate an agent for the given task */
void
spawn_retry_agent (agent_arg_t *agent_arg_ptr)
{
pthread_attr_t attr_agent;
pthread_t thread_agent;
if (agent_arg_ptr == NULL)
return;
debug3 ("Spawning RPC retry agent");
if (pthread_attr_init (&attr_agent))
fatal ("pthread_attr_init error %m");
if (pthread_attr_setdetachstate (&attr_agent, PTHREAD_CREATE_DETACHED))
error ("pthread_attr_setdetachstate error %m");
if (pthread_attr_setscope (&attr_agent, PTHREAD_SCOPE_SYSTEM))
error ("pthread_attr_setscope error %m");
if (pthread_create (&thread_agent, &attr_agent,
agent, (void *)agent_arg_ptr)) {
error ("pthread_create error %m");
sleep (1); /* sleep and try once more */
if (pthread_create (&thread_agent, &attr_agent,
agent, (void *)agent_arg_ptr))
fatal ("pthread_create error %m");
/* slurmctld_free_job_launch_msg is a variant of slurm_free_job_launch_msg
* because all environment variables currently loaded in one xmalloc
* buffer (see get_job_env()), which is different from how slurmd
* assembles the data from a message */
void slurmctld_free_job_launch_msg(batch_job_launch_msg_t * msg)
{
if (msg) {
if (msg->environment) {
if (msg->environment[0])
xfree(msg->environment[0]);
xfree(msg->environment);
msg->environment = NULL;
}
slurm_free_job_launch_msg (msg);
}
}
/* agent_purge - purge all pending RPC requests */
void agent_purge (void)
{ retry_list = list_create (NULL);
pthread_mutex_lock (&retry_mutex);
if (retry_list == NULL)
list_destroy (retry_list);
pthread_mutex_unlock (&retry_mutex);
}