diff --git a/src/slurmctld/agent.c b/src/slurmctld/agent.c index 1463b27a9ae568fcb512eee5006a871a5739fb47..704705e8c44defc01344739ec7e92545748af80a 100644 --- a/src/slurmctld/agent.c +++ b/src/slurmctld/agent.c @@ -4,7 +4,7 @@ * Copyright (C) 2002 The Regents of the University of California. * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). * Written by Moe Jette <jette@llnl.gov>, et. al. - * Derived from dsh written by Jim Garlick <garlick1@llnl.gov> + * Derived from pdsh written by Jim Garlick <garlick1@llnl.gov> * UCRL-CODE-2002-040. * * This file is part of SLURM, a resource management program. @@ -31,16 +31,18 @@ * For example, when a job step completes slurmctld needs to revoke credentials * for that job step on every node it was allocated to. We don't want to * hang slurmctld's primary functions to perform this work, so it just - * initiates an agent to perform the work. + * initiates an agent to perform the work. The agent is passed all details + * required to perform the work, so it will be possible to execute the + * agent as an pthread, process, or even a daemon on some other computer. * * The main thread creates a separate thread for each node to be communicated * with. A special watchdog thread sends SIGLARM to any threads that have been - * active (in DSH_ACTIVE state) for more than COMMAND_TIMEOUT seconds. + * active (in DSH_ACTIVE state) for more than COMMAND_TIMEOUT seconds. + * The agent responds to slurmctld via an RPC as required. + * For example, informing slurmctld that some node is not responding. * - * All the state for a thread is contained in thd_t struct. An array of - * these structures is declared globally so signal handlers can access. - * The array is initialized by dsh() below, and the rsh() function for each - * thread is passed the element corresponding to one connection. + * All the state for each thread is contained in thd_t struct, which is + * passed to the agent upon startup and freed upon completion. \*****************************************************************************/ #ifdef HAVE_CONFIG_H @@ -52,123 +54,163 @@ #include <signal.h> #include <unistd.h> -#include <src/slurmctld/slurmctld.h> +typedef struct task_info { + pthread_mutex_t *thread_mutex; /* agent specific mutex */ + pthread_cond_t *thread_cond; /* agent specific condition */ + uint32_t *threads_active; /* count of currently active threads */ + thd_t *thread_struct; /* thread structures */ + slurm_msg_type_t msg_type; /* RPC to be issued */ + void *msg_args; /* RPC data to be used */ +} task_info_t; -#define AGENT_THREAD_COUNT 10 -#define COMMAND_TIMEOUT 10 /* secs */ -#define INTR_TIME 1 /* secs */ -#define WDOG_POLL 2 /* secs */ - -typedef enum {DSH_NEW, DSH_ACTIVE, DSH_DONE, DSH_FAILED} state_t; - -typedef struct thd { - pthread_t thread; /* thread ID */ - pthread_attr_t attr; /* thread attributes */ - state_t state; /* thread state */ - time_t start; /* time stamp for start */ - time_t finish; /* time stamp for finish */ - struct sockaddr_in slurm_addr; /* network address */ - char node_name[MAX_NAME_LEN]; /* name of the node */ -} thd_t; - -/* - * Mutex and condition variable for implementing `fanout'. When a thread - * terminates, it decrements threadcount and signals threadcount_cond. - * The main, once it has spawned the fanout number of threads, suspends itself - * until a thread termintates. - */ -static pthread_mutex_t threadcount_mutex = PTHREAD_MUTEX_INITIALIZER; -static pthread_cond_t threadcount_cond = PTHREAD_COND_INITIALIZER; -static int threadcount = 0; - -void *thread_revoke_job_cred (void *arg); +void *thread_revoke_job_cred (void *args); void *wdog (void *args); -/* - * Watchdog thread. Send SIGALRM to - * - threads which have been active for too long - * Sleep for WDOG_POLL seconds between polls (actually sleep for COMMAND_TIMEOUT - * on the first iteration). +/* + * agent - party responsible for performing some task in parallel across a set of nodes + * input: pointer to agent_info_t, which is xfree'd upon completion if AGENT_IS_THREAD is set */ -static void * -wdog (void *args) -{ - int i; - - for (;;) { - for (i = 0; t[i].host != NULL; i++) { - switch (t[i].state) { - case DSH_ACTIVE: - if (t[i].start + COMMAND_TIMEOUT < time(NULL)) - pthread_kill(t[i].thread, SIGALRM); - break; - case DSH_NEW: - case DSH_DONE: - break; - } - } - sleep(i == 0 ? COMMAND_TIMEOUT : WDOG_POLL); - } - return NULL; -} - -/* agent_revoke_job_cred - thread responsible for revoking all credentials on all - * nodes for a particular job */ void * -agent_revoke_job_cred (void *arg) +agent (void *args) { int i, rc; pthread_attr_t attr_wdog; pthread_t thread_wdog; revoke_credential_msg_t revoke_job_cred; + agent_info_t *agent_ptr = (agent_info_t *) args; + thd_t *thread_ptr = agent_ptr->thread_struct; + task_info_t *task_specific_ptr; - node_count = ? - revoke_job_cred = ? + /* basic argument value tests */ + if (agent_ptr->thread_count == 0) + go to cleanup; + if (thread_ptr == NULL) { + error ("agent_revoke_job_cred passed null thread_struct"); + go to cleanup; + } + if (agent_ptr->msg_type != REQUEST_REVOKE_JOB_CREDENTIAL) { + fatal ("agent_revoke_job_cred passed invaid message type %d", agent_ptr->msg_type); + go to cleanup; + } + + /* initialize the thread data structure */ + agent_ptr->thread_mutex = PTHREAD_MUTEX_INITIALZER; + agent_ptr->thread_cond = PTHREAD_COND_INITIALZER; + agent_ptr->threads_active = 0; + for (i = 0; i < agent_ptr->thread_count; i++) { + thread_ptr[i].state = DSH_NEW; + } /* start the watchdog thread */ if (pthread_attr_init (&attr_wdog)) error ("pthread_attr_init errno %d", errno); if (pthread_attr_setdetachstate (&attr_wdog, PTHREAD_CREATE_DETACHED)) error ("pthread_attr_setdetachstate errno %d", errno); - if (pthread_create (&thread_wdog, &attr_wdog, wdog, (void *)t) { +#ifdef PTHREAD_SCOPE_SYSTEM + /* we want 1:1 threads if there is a choice */ + pthread_attr_setscope (&attr_wdog, PTHREAD_SCOPE_SYSTEM); +#endif + if (pthread_create (&thread_wdog, &attr_wdog, wdog, args) { error ("pthread_create errno %d", errno); - pthread_exit (errno); + sleep (1); + if (pthread_create (&thread_wdog, &attr_wdog, wdog, args) + fatal ("pthread_create errno %d", errno); } /* start all the other threads (at most AGENT_THREAD_COUNT active at once) */ - for (i = 0; i < node_count; i++) { + for (i = 0; i < agent_ptr->thread_count; i++) { /* wait until "room" for another thread */ - pthread_mutex_lock (&threadcount_mutex); - - if (AGENT_THREAD_COUNT == threadcount) - pthread_cond_wait (&threadcount_cond, &threadcount_mutex); + pthread_mutex_lock (&agent_ptr->thread_mutex); + if (AGENT_THREAD_COUNT == agent_ptr->threads_active) + pthread_cond_wait (&agent_ptr->thread_cond, &agent_ptr->thread_mutex); /* create thread */ - pthread_attr_init (&t[i].attr); - pthread_attr_setdetachstate (&t[i].attr, PTHREAD_CREATE_DETACHED); + task_specific_ptr = malloc (sizeof (task_info_t)); + task_specific_ptr->thread_mutex = &agent_ptr->thread_mutex; + task_specific_ptr->thread_cond = &agent_ptr->thread_cond; + task_specific_ptr->threads_active = &agent_ptr->thread_count; + task_specific_ptr->thread_struct = &thread_ptr[i]; + task_specific_ptr->msg_type = agent_ptr->msg_type; + task_specific_ptr->msg_args = &agent_ptr->msg_args; + + pthread_attr_init (&thread_ptr[i].attr); + pthread_attr_setdetachstate (&thread_ptr[i].attr, PTHREAD_CREATE_DETACHED); #ifdef PTHREAD_SCOPE_SYSTEM /* we want 1:1 threads if there is a choice */ - pthread_attr_setscope (&t[i].attr, PTHREAD_SCOPE_SYSTEM); + pthread_attr_setscope (&thread_ptr[i].attr, PTHREAD_SCOPE_SYSTEM); #endif - rc = pthread_create (&t[i].thread, &t[i].attr, thread_revoke_job_cred, (void *)&t[i])); - threadcount++; - pthread_mutex_unlock(&threadcount_mutex); + if (agent_ptr->msg_type != REQUEST_REVOKE_JOB_CREDENTIAL) { + rc = pthread_create (&thread_ptr[i].thread, &thread_ptr[i].attr, + thread_revoke_job_cred, (void *) task_specific_ptr); + + agent_ptr->threads_active++; + pthread_mutex_unlock (&agent_ptr->thread_mutex); - if (rc) { - error ("pthread_create errno %d\n", errno); - thread_revoke_job_cred ((void *)&t[i])); + if (rc) { + error ("pthread_create errno %d\n", errno); + /* execute task within this thread */ + thread_revoke_job_cred ((void *) task_specific_ptr); + } } } /* wait for termination of remaining threads */ - pthread_mutex_lock(&threadcount_mutex); - while (threadcount > 0) - pthread_cond_wait(&threadcount_cond, &threadcount_mutex); + pthread_mutex_lock(&agent_ptr->thread_mutex); + while (agent_ptr->threads_active > 0) + pthread_cond_wait(&agent_ptr->thread_cond, &agent_ptr->thread_mutex); + +cleanup: +#if AGENT_IS_THREAD + if (thread_ptr) + xfree (thread_ptr); + if (agent_ptr->msg_args) + xfree (agent_ptr->msg_args); + xfree (agent_ptr); +#endif + return NULL; +} +/* + * wdog - Watchdog thread. Send SIGALRM to threads which have been active for too long. + * Sleep for WDOG_POLL seconds between polls. + */ +static void * +wdog (void *args) +{ + int i, not_done; + agent_info_t *agent_ptr = (agent_info_t *) args; + thd_t *thread_ptr = agent_ptr->thread_struct; + time_t min_start; + + while (1) { + not_done = 0; /* assume all threads complete for now */ + sleep (WDOG_POLL); + min_start = time(NULL) - COMMAND_TIMEOUT; + + pthread_mutex_lock (&agent_ptr->thread_mutex); + for (i = 0; i < agent_ptr->thread_count; i++) { + switch (thread_ptr[i].state) { + case DSH_ACTIVE: + not_done = 1; + if (thread_ptr[i].start < min_start) + pthread_kill(thread_ptr[i].thread, SIGALRM); + break; + case DSH_NEW: + not_done = 1; + break; + case DSH_DONE: + case DSH_FAILED: + break; + } + } + pthread_mutex_unlock (&agent_ptr->thread_mutex); + if (not_done == 0) + return NULL; + } } -/* thread_revoke_job_cred - thread to revoke a credential on every node upon a job's completion */ +/* thread_revoke_job_cred - thread to revoke a credential on a collection of nodes */ void * thread_revoke_job_cred (void *arg) { @@ -179,10 +221,13 @@ thread_revoke_job_cred (void *arg) slurm_msg_t request_msg ; slurm_msg_t response_msg ; return_code_msg_t * slurm_rc_msg ; - thd_t *thd_ptr = (thd_t *)arg; + task_info_t *task_ptr = (task_info_t *) args; + thd_t *thread_ptr = task_ptr->thread_struct; - thd_ptr->state = DSH_ACTIVE; - thd_ptr->start = time(NULL); + pthread_mutex_lock (task_ptr->thread_mutex); + thread_ptr->state = DSH_ACTIVE; + thread_ptr->start = time(NULL); + pthread_mutex_unlock (task_ptr->thread_mutex); /* accept SIGALRM */ if (sigemptyset (&set)) @@ -191,32 +236,32 @@ thread_revoke_job_cred (void *arg) error ("sigaddset errno %d on SIGALRM", errno); /* init message connection for message communication with slurmd */ - if ( ( sockfd = slurm_open_msg_conn (& thd_ptr -> slurm_addr) ) == SLURM_SOCKET_ERROR ) { - error ("thread_revoke_job_cred/slurm_open_msg_conn error for %s", thd_ptr->node_name); + if ( ( sockfd = slurm_open_msg_conn (& thread_ptr->slurm_addr) ) == SLURM_SOCKET_ERROR ) { + error ("thread_revoke_job_cred/slurm_open_msg_conn error for %s", thread_ptr->node_name); goto cleanup; } /* send request message */ - request_msg . msg_type = REQUEST_REVOKE_JOB_CREDENTIAL ; - request_msg . data = revoke_job_cred_ptr ; + request_msg . msg_type = task_ptr->msg_type ; + request_msg . data = task_ptr->msg_args ; if ( ( rc = slurm_send_node_msg ( sockfd , & request_msg ) ) == SLURM_SOCKET_ERROR ) { - error ("thread_revoke_job_cred/slurm_send_node_msg error for %s", thd_ptr->node_name); + error ("thread_revoke_job_cred/slurm_send_node_msg error for %s", thread_ptr->node_name); goto cleanup; } /* receive message */ if ( ( msg_size = slurm_receive_msg ( sockfd , & response_msg ) ) == SLURM_SOCKET_ERROR ) { - error ("thread_revoke_job_cred/slurm_receive_msg error for %s", thd_ptr->node_name); + error ("thread_revoke_job_cred/slurm_receive_msg error for %s", thread_ptr->node_name); goto cleanup; } /* shutdown message connection */ if ( ( rc = slurm_shutdown_msg_conn ( sockfd ) ) == SLURM_SOCKET_ERROR ) { - error ("thread_revoke_job_cred/slurm_shutdown_msg_conn error for %s", thd_ptr->node_nam); + error ("thread_revoke_job_cred/slurm_shutdown_msg_conn error for %s", thread_ptr->node_nam); goto cleanup; } if ( msg_size ) { - error ("thread_revoke_job_cred/msg_size error %d for %s", msg_size, thd_ptr->node_nam); + error ("thread_revoke_job_cred/msg_size error %d for %s", msg_size, thread_ptr->node_nam); goto cleanup; } @@ -227,23 +272,23 @@ thread_revoke_job_cred (void *arg) rc = slurm_rc_msg->return_code; slurm_free_return_code_msg ( slurm_rc_msg ); if (rc) - error ("thread_revoke_job_cred/rc error %d for %s", rc, thd_ptr->node_nam); + error ("thread_revoke_job_cred/rc error %d for %s", rc, thread_ptr->node_nam); break ; default: error ("thread_revoke_job_cred/msg_type error %d for %s", - response_msg.msg_type, thd_ptr->node_nam); + response_msg.msg_type, thread_ptr->node_nam); break ; } cleanup: - thd_ptr->state = DSH_DONE; - thd_ptr->finish = time(NULL); + pthread_mutex_lock (task_ptr->thread_mutex); + thread_ptr->state = DSH_DONE; + thread_ptr->finish = time(NULL); /* Signal completion so another thread can replace us */ - pthread_mutex_lock(&threadcount_mutex); - threadcount--; - pthread_cond_signal(&threadcount_cond); - pthread_mutex_unlock(&threadcount_mutex); + (*task_ptr->threads_active)--; + pthread_cond_signal(task_ptr->thread_cond); + pthread_mutex_unlock (task_ptr->thread_mutex); pthread_exit ((void *)NULL); } diff --git a/src/slurmctld/agent.h b/src/slurmctld/agent.h new file mode 100644 index 0000000000000000000000000000000000000000..1734c8ecd12d4a52a70513a634ce3264a63353a4 --- /dev/null +++ b/src/slurmctld/agent.h @@ -0,0 +1,60 @@ +/*****************************************************************************\ + * agent.h - data structures and function definitions for parallel + * background communications + ***************************************************************************** + * Copyright (C) 2002 The Regents of the University of California. + * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). + * Written by Moe Jette <jette@llnl.gov>, et. al. + * Derived from dsh written by Jim Garlick <garlick1@llnl.gov> + * UCRL-CODE-2002-040. + * + * This file is part of SLURM, a resource management program. + * For details, see <http://www.llnl.gov/linux/slurm/>. + * + * SLURM is free software; you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the License, or (at your option) + * any later version. + * + * SLURM is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along + * with SLURM; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + ***************************************************************************** + +#include <src/slurmctld/agent.h> +#include <src/slurmctld/slurmctld.h> + +#define AGENT_IS_THREAD 1 /* set if agent itself a thread of slurmctld */ +#define AGENT_THREAD_COUNT 10 /* maximum active agent threads */ +#define COMMAND_TIMEOUT 10 /* secs */ +#define INTR_TIME 1 /* secs */ +#define WDOG_POLL 2 /* secs */ + +typedef enum {DSH_NEW, DSH_ACTIVE, DSH_DONE, DSH_FAILED} state_t; + +typedef struct thd { + pthread_t thread; /* thread ID */ + pthread_attr_t attr; /* thread attributes */ + state_t state; /* thread state */ + time_t start; /* time stamp for start */ + time_t finish; /* time stamp for finish */ + struct sockaddr_in slurm_addr; /* network address */ + char node_name[MAX_NAME_LEN]; /* name of the node */ +} thd_t; + +typedef struct agent_info { + pthread_mutex_t thread_mutex; /* agent specific mutex */ + pthread_cond_t thread_cond; /* agent specific condition */ + uint32_t thread_count; /* number of threads records */ + uint32_t threads_active; /* count of currently active threads */ + thd_t *thread_struct; /* thread structures */ + slurm_msg_type_t msg_type; /* RPC to be issued */ + void *msg_args; /* RPC data to be used */ +} agent_info_t; + +void *agent (void *args); diff --git a/src/slurmctld/controller.c b/src/slurmctld/controller.c index 0634ff8925438d8a58efb7e05ec925c88a13aa20..5c3b6293177e339b6da717eb88754a7e743e486a 100644 --- a/src/slurmctld/controller.c +++ b/src/slurmctld/controller.c @@ -163,6 +163,10 @@ main (int argc, char *argv[]) /* create attached thread for background activities */ if (pthread_attr_init (&thread_attr_bg)) fatal ("pthread_attr_init errno %d", errno); +#ifdef PTHREAD_SCOPE_SYSTEM + /* we want 1:1 threads if there is a choice */ + pthread_attr_setscope (&thread_attr_bg, PTHREAD_SCOPE_SYSTEM); +#endif if (pthread_create ( &thread_id_bg, &thread_attr_bg, slurmctld_background, NULL)) fatal ("pthread_create errno %d", errno); @@ -172,6 +176,10 @@ main (int argc, char *argv[]) pthread_mutex_unlock(&thread_count_lock); if (pthread_attr_init (&thread_attr_rpc)) fatal ("pthread_attr_init errno %d", errno); +#ifdef PTHREAD_SCOPE_SYSTEM + /* we want 1:1 threads if there is a choice */ + pthread_attr_setscope (&thread_attr_rpc, PTHREAD_SCOPE_SYSTEM); +#endif if (pthread_create ( &thread_id_rpc, &thread_attr_rpc, slurmctld_rpc_mgr, NULL)) fatal ("pthread_create errno %d", errno); @@ -236,6 +244,10 @@ slurmctld_rpc_mgr ( void * no_data ) fatal ("pthread_attr_init errno %m %d", errno); if (pthread_attr_setdetachstate (&thread_attr_rpc_req, PTHREAD_CREATE_DETACHED)) fatal ("pthread_attr_setdetachstate errno %m %d", errno); +#ifdef PTHREAD_SCOPE_SYSTEM + /* we want 1:1 threads if there is a choice */ + pthread_attr_setscope (&thread_attr_rpc_req, PTHREAD_SCOPE_SYSTEM); +#endif /* initialize port for RPCs */ if ( ( sockfd = slurm_init_msg_engine_port ( slurmctld_conf . slurmctld_port ) )