From 2846669b096056bf94461a30bd5f3e89a426673c Mon Sep 17 00:00:00 2001 From: Moe Jette <jette1@llnl.gov> Date: Thu, 19 Sep 2002 00:28:50 +0000 Subject: [PATCH] Framework for agent in place and tested except for failure reporting. --- src/slurmctld/agent.c | 160 +++++++++++++++++++-------------- src/slurmctld/controller.c | 9 +- src/slurmctld/node_scheduler.c | 53 +++++++++-- 3 files changed, 146 insertions(+), 76 deletions(-) diff --git a/src/slurmctld/agent.c b/src/slurmctld/agent.c index 31357951c62..316563e4f50 100644 --- a/src/slurmctld/agent.c +++ b/src/slurmctld/agent.c @@ -27,19 +27,19 @@ ***************************************************************************** * Theory of operation: * - * The functions below permit slurmctld to initiate parallel tasks as a + * The functions below permit slurm to initiate parallel tasks as a * detached thread and let the functions below make sure the work happens. * For example, when a job step completes slurmctld needs to revoke credentials - * for that job step on every node it was allocated to. We don't want to - * hang slurmctld's primary functions to perform this work, so it just - * initiates an agent to perform the work. The agent is passed all details - * required to perform the work, so it will be possible to execute the - * agent as an pthread, process, or even a daemon on some other computer. + * for that job step on every node to which it was allocated. We don't want to + * hang slurmctld's primary function (the job complete RPC) to perform this + * work, so it just initiates an agent to perform the work. The agent is passed + * all details required to perform the work, so it will be possible to execute + * the agent as an pthread, process, or even a daemon on some other computer. * - * The main thread creates a separate thread for each node to be communicated - * with up to AGENT_THREAD_COUNT. A special watchdog thread sends SIGLARM to - * any threads that have been active (in DSH_ACTIVE state) for more than - * COMMAND_TIMEOUT seconds. + * The main agent thread creates a separate thread for each node to be + * communicated with up to AGENT_THREAD_COUNT. A special watchdog thread sends + * SIGLARM to any threads that have been active (in DSH_ACTIVE state) for more + * than COMMAND_TIMEOUT seconds. * The agent responds to slurmctld via an RPC as required. * For example, informing slurmctld that some node is not responding. * @@ -101,8 +101,10 @@ static void *thread_per_node_rpc (void *args); static void *wdog (void *args); /* - * agent - party responsible for transmitting an common RPC in parallel across a set of nodes - * input: pointer to agent_arg_t, which is xfree'd upon completion if AGENT_IS_THREAD is set + * agent - party responsible for transmitting an common RPC in parallel across a set + * of nodes + * input: pointer to agent_arg_t, which is xfree'd (including slurm_addr and msg_args) + * upon completion if AGENT_IS_THREAD is set */ void * agent (void *args) @@ -110,58 +112,66 @@ agent (void *args) int i, rc; pthread_attr_t attr_wdog; pthread_t thread_wdog; - agent_arg_t *agent_arg_ptr; + agent_arg_t *agent_arg_ptr = args; agent_info_t *agent_info_ptr = NULL; thd_t *thread_ptr; task_info_t *task_specific_ptr; /* basic argument value tests */ + if (agent_arg_ptr == NULL) + fatal ("agent NULL argument"); if (agent_arg_ptr->addr_count == 0) - goto cleanup; + goto cleanup; /* no messages to be sent */ if (agent_arg_ptr->slurm_addr == NULL) - error ("agent passed null address list"); + fatal ("agent passed NULL address list"); if (agent_arg_ptr->msg_type != REQUEST_REVOKE_JOB_CREDENTIAL) - fatal ("agent passed invaid message type %d", agent_arg_ptr->msg_type); + fatal ("agent passed invalid message type %d", agent_arg_ptr->msg_type); /* initialize the data structures */ agent_info_ptr = xmalloc (sizeof (agent_info_t)); - thread_ptr = xmalloc (agent_arg_ptr->addr_count * sizeof (thd_t)); if (pthread_mutex_init (&agent_info_ptr->thread_mutex, NULL)) - fatal ("agent: pthread_mutex_init error %m"); + fatal (" pthread_mutex_init error %m"); if (pthread_cond_init (&agent_info_ptr->thread_cond, NULL)) - fatal ("agent: pthread_cond_init error %m"); + fatal ("pthread_cond_init error %m"); agent_info_ptr->thread_count = agent_arg_ptr->addr_count; agent_info_ptr->threads_active = 0; + thread_ptr = xmalloc (agent_arg_ptr->addr_count * sizeof (thd_t)); agent_info_ptr->thread_struct = thread_ptr; agent_info_ptr->msg_type = agent_arg_ptr->msg_type; agent_info_ptr->msg_args = agent_arg_ptr->msg_args; for (i = 0; i < agent_info_ptr->thread_count; i++) { thread_ptr[i].state = DSH_NEW; + thread_ptr[i].slurm_addr = agent_arg_ptr->slurm_addr[i]; } /* start the watchdog thread */ if (pthread_attr_init (&attr_wdog)) - fatal ("agent: pthread_attr_init error %m"); - if (pthread_attr_setdetachstate (&attr_wdog, PTHREAD_CREATE_DETACHED)) - error ("agent: pthread_attr_setdetachstate error %m"); + fatal ("pthread_attr_init error %m"); + if (pthread_attr_setdetachstate (&attr_wdog, PTHREAD_CREATE_JOINABLE)) + error ("pthread_attr_setdetachstate error %m"); #ifdef PTHREAD_SCOPE_SYSTEM if (pthread_attr_setscope (&attr_wdog, PTHREAD_SCOPE_SYSTEM)) - error ("agent: pthread_attr_setscope error %m"); + error ("pthread_attr_setscope error %m"); #endif if (pthread_create (&thread_wdog, &attr_wdog, wdog, (void *)agent_info_ptr)) { - error ("agent: pthread_create error %m"); + error ("pthread_create error %m"); sleep (1); /* sleep and try once more */ if (pthread_create (&thread_wdog, &attr_wdog, wdog, args)) - fatal ("agent: pthread_create error %m"); + fatal ("pthread_create error %m"); } - /* start all the other threads (at most AGENT_THREAD_COUNT active at once) */ + /* start all the other threads (up to AGENT_THREAD_COUNT active at once) */ for (i = 0; i < agent_info_ptr->thread_count; i++) { /* wait until "room" for another thread */ pthread_mutex_lock (&agent_info_ptr->thread_mutex); - if (AGENT_THREAD_COUNT == agent_info_ptr->threads_active) - pthread_cond_wait (&agent_info_ptr->thread_cond, &agent_info_ptr->thread_mutex); +#if AGENT_THREAD_COUNT < 1 + fatal ("AGENT_THREAD_COUNT value is invalid"); +#endif + while (agent_info_ptr->threads_active >= AGENT_THREAD_COUNT) { + pthread_cond_wait (&agent_info_ptr->thread_cond, + &agent_info_ptr->thread_mutex); + } /* create thread */ task_specific_ptr = xmalloc (sizeof (task_info_t)); @@ -172,24 +182,31 @@ agent (void *args) task_specific_ptr->msg_type = agent_info_ptr->msg_type; task_specific_ptr->msg_args = &agent_info_ptr->msg_args; - pthread_attr_init (&thread_ptr[i].attr); - pthread_attr_setdetachstate (&thread_ptr[i].attr, PTHREAD_CREATE_JOINABLE); + if (pthread_attr_init (&thread_ptr[i].attr)) + fatal ("pthread_attr_init error %m"); + if (pthread_attr_setdetachstate (&thread_ptr[i].attr, PTHREAD_CREATE_DETACHED)) + error ("pthread_attr_setdetachstate error %m"); #ifdef PTHREAD_SCOPE_SYSTEM - pthread_attr_setscope (&thread_ptr[i].attr, PTHREAD_SCOPE_SYSTEM); + if (pthread_attr_setscope (&thread_ptr[i].attr, PTHREAD_SCOPE_SYSTEM)) + error ("pthread_attr_setscope error %m"); #endif - if (agent_info_ptr->msg_type != REQUEST_REVOKE_JOB_CREDENTIAL) { - rc = pthread_create (&thread_ptr[i].thread, &thread_ptr[i].attr, - thread_per_node_rpc, (void *) task_specific_ptr); - - agent_info_ptr->threads_active++; - pthread_mutex_unlock (&agent_info_ptr->thread_mutex); - - if (rc) { - error ("pthread_create error %m"); - /* execute task within this thread */ - thread_per_node_rpc ((void *) task_specific_ptr); + while ( (rc = pthread_create (&thread_ptr[i].thread, + &thread_ptr[i].attr, + thread_per_node_rpc, + (void *) task_specific_ptr)) ) { + error ("pthread_create error %m"); + if (agent_info_ptr->threads_active) + pthread_cond_wait (&agent_info_ptr->thread_cond, + &agent_info_ptr->thread_mutex); + else { + pthread_mutex_unlock (&agent_info_ptr->thread_mutex); + sleep (1); + pthread_mutex_lock (&agent_info_ptr->thread_mutex); } } + + agent_info_ptr->threads_active++; + pthread_mutex_unlock (&agent_info_ptr->thread_mutex); } /* wait for termination of remaining threads */ @@ -197,16 +214,22 @@ agent (void *args) while (agent_info_ptr->threads_active > 0) pthread_cond_wait(&agent_info_ptr->thread_cond, &agent_info_ptr->thread_mutex); pthread_join (thread_wdog, NULL); - return NULL; cleanup: #if AGENT_IS_THREAD - if (agent_arg_ptr->slurm_addr) - xfree (agent_arg_ptr->slurm_addr); - if (agent_arg_ptr->msg_args) - xfree (agent_arg_ptr->msg_args); - xfree (agent_arg_ptr); + if (agent_arg_ptr) { + if (agent_arg_ptr->slurm_addr) + xfree (agent_arg_ptr->slurm_addr); + if (agent_arg_ptr->msg_args) + xfree (agent_arg_ptr->msg_args); + xfree (agent_arg_ptr); + } #endif + if (agent_info_ptr) { + if (agent_info_ptr->thread_struct) + xfree (agent_info_ptr->thread_struct); + xfree (agent_info_ptr); + } return NULL; } @@ -217,44 +240,39 @@ cleanup: static void * wdog (void *args) { - int i, fail_cnt, work_done; + int i, fail_cnt, work_done, delay, max_delay = 0; agent_info_t *agent_ptr = (agent_info_t *) args; thd_t *thread_ptr = agent_ptr->thread_struct; - time_t min_start; - time_t max_time; while (1) { work_done = 1; /* assume all threads complete for now */ fail_cnt = 0; /* assume all threads complete sucessfully for now */ - max_time = 0; sleep (WDOG_POLL); - min_start = time(NULL) - COMMAND_TIMEOUT; pthread_mutex_lock (&agent_ptr->thread_mutex); for (i = 0; i < agent_ptr->thread_count; i++) { switch (thread_ptr[i].state) { case DSH_ACTIVE: work_done = 0; - if (thread_ptr[i].time < min_start) + delay = difftime (time (NULL), thread_ptr[i].time); + if ( delay >= COMMAND_TIMEOUT) pthread_kill(thread_ptr[i].thread, SIGALRM); break; case DSH_NEW: work_done = 0; break; case DSH_DONE: - if (max_time < thread_ptr[i].time) - max_time = thread_ptr[i].time; + if ( max_delay < (int) thread_ptr[i].time ) + max_delay = (int) thread_ptr[i].time; break; case DSH_FAILED: fail_cnt++; break; } } - pthread_mutex_unlock (&agent_ptr->thread_mutex); - if (work_done) { - info ("max time used %ld msec", (long) max_time); + if (work_done) break; - } + pthread_mutex_unlock (&agent_ptr->thread_mutex); } /* Notify slurmctld of non-responding nodes */ @@ -268,7 +286,7 @@ wdog (void *args) continue; /* build a list of slurm_addr's */ slurm_addr[fail_cnt++] = thread_ptr[i].slurm_addr; - + xfree (thread_ptr); } info ("agent/wdog: %d nodes failed to respond", fail_cnt); @@ -276,7 +294,9 @@ wdog (void *args) xfree (slurm_addr); } + info ("agent maximum delay %d seconds", max_delay); + pthread_mutex_unlock (&agent_ptr->thread_mutex); return (void *) NULL; } @@ -295,16 +315,22 @@ thread_per_node_rpc (void *args) thd_t *thread_ptr = task_ptr->thread_struct; state_t thread_state = DSH_FAILED; + /* accept SIGALRM */ + if (sigemptyset (&set)) + error ("sigemptyset error: %m"); + if (sigaddset (&set, SIGALRM)) + error ("sigaddset error on SIGALRM: %m"); + if (sigprocmask (SIG_UNBLOCK, &set, NULL) != 0) + error ("sigprocmask error: %m"); + + if (args == NULL) + fatal ("thread_per_node_rpc has NULL argument"); pthread_mutex_lock (task_ptr->thread_mutex); thread_ptr->state = DSH_ACTIVE; thread_ptr->time = time (NULL); pthread_mutex_unlock (task_ptr->thread_mutex); - /* accept SIGALRM */ - sigemptyset (&set); - sigaddset (&set, SIGALRM); - - /* init message connection for message communication with slurmd */ + /* init message connection for message communication */ if ( ( sockfd = slurm_open_msg_conn (& thread_ptr->slurm_addr) ) == SLURM_SOCKET_ERROR ) { error ("thread_per_node_rpc/slurm_open_msg_conn error %m"); goto cleanup; @@ -354,7 +380,7 @@ thread_per_node_rpc (void *args) cleanup: pthread_mutex_lock (task_ptr->thread_mutex); thread_ptr->state = thread_state; - thread_ptr->time = time(NULL) - thread_ptr->time; + thread_ptr->time = (time_t) difftime (time (NULL), thread_ptr->time); /* Signal completion so another thread can replace us */ (*task_ptr->threads_active)--; diff --git a/src/slurmctld/controller.c b/src/slurmctld/controller.c index 1edb70a4272..97473cc3b35 100644 --- a/src/slurmctld/controller.c +++ b/src/slurmctld/controller.c @@ -163,7 +163,8 @@ main (int argc, char *argv[]) fatal ("pthread_attr_init error %m"); #ifdef PTHREAD_SCOPE_SYSTEM /* we want 1:1 threads if there is a choice */ - pthread_attr_setscope (&thread_attr_sig, PTHREAD_SCOPE_SYSTEM); + if (pthread_attr_setscope (&thread_attr_sig, PTHREAD_SCOPE_SYSTEM)) + error ("pthread_attr_setscope error %m"); #endif if (pthread_create ( &thread_id_sig, &thread_attr_sig, slurmctld_signal_hand, NULL)) fatal ("pthread_create %m"); @@ -178,7 +179,8 @@ main (int argc, char *argv[]) fatal ("pthread_attr_init error %m"); #ifdef PTHREAD_SCOPE_SYSTEM /* we want 1:1 threads if there is a choice */ - pthread_attr_setscope (&thread_attr_rpc, PTHREAD_SCOPE_SYSTEM); + if (pthread_attr_setscope (&thread_attr_rpc, PTHREAD_SCOPE_SYSTEM)) + error ("pthread_attr_setscope error %m"); #endif if (pthread_create ( &thread_id_rpc, &thread_attr_rpc, slurmctld_rpc_mgr, NULL)) fatal ("pthread_create error %m"); @@ -274,7 +276,8 @@ slurmctld_rpc_mgr ( void * no_data ) fatal ("pthread_attr_setdetachstate %m"); #ifdef PTHREAD_SCOPE_SYSTEM /* we want 1:1 threads if there is a choice */ - pthread_attr_setscope (&thread_attr_rpc_req, PTHREAD_SCOPE_SYSTEM); + if (pthread_attr_setscope (&thread_attr_rpc_req, PTHREAD_SCOPE_SYSTEM)) + error ("pthread_attr_setscope error %m"); #endif /* initialize port for RPCs */ diff --git a/src/slurmctld/node_scheduler.c b/src/slurmctld/node_scheduler.c index 2288c206f0f..6edafa7555d 100644 --- a/src/slurmctld/node_scheduler.c +++ b/src/slurmctld/node_scheduler.c @@ -30,14 +30,19 @@ #endif #include <errno.h> +#include <pthread.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <syslog.h> +#include <unistd.h> #include <src/common/slurm_errno.h> +#include <src/common/xmalloc.h> +#include <src/slurmctld/agent.h> #include <src/slurmctld/slurmctld.h> +#define AGENT_TEST 1 #define BUF_SIZE 1024 struct node_set { /* set of nodes with same configuration */ @@ -109,20 +114,56 @@ void deallocate_nodes (struct job_record * job_ptr) { int i; - revoke_credential_msg_t revoke_job_cred; - + revoke_credential_msg_t *revoke_job_cred; +#if AGENT_TEST + agent_arg_t *agent_args; + pthread_attr_t attr_agent; + pthread_t thread_agent; + + agent_args = xmalloc (sizeof (agent_arg_t)); + agent_args->msg_type = REQUEST_REVOKE_JOB_CREDENTIAL; +#endif + revoke_job_cred = xmalloc (sizeof (revoke_credential_msg_t)); last_node_update = time (NULL); - revoke_job_cred.job_id = job_ptr -> job_id; - revoke_job_cred.expiration_time = job_ptr -> details -> credential . expiration_time ; - memset ( (void *)revoke_job_cred.signature, 0, sizeof (revoke_job_cred.signature)); + revoke_job_cred->job_id = job_ptr->job_id; + revoke_job_cred->expiration_time = job_ptr->details->credential.expiration_time ; + memset ( (void *)revoke_job_cred->signature, 0, sizeof (revoke_job_cred->signature)); for (i = 0; i < node_record_count; i++) { if (bit_test (job_ptr->node_bitmap, i) == 0) continue; - slurm_revoke_job_cred (&node_record_table_ptr[i], &revoke_job_cred); +#if AGENT_TEST + xrealloc ((agent_args->slurm_addr), + (sizeof (struct sockaddr_in) * (agent_args->addr_count+1))); + agent_args->slurm_addr[(agent_args->addr_count++)] = + node_record_table_ptr[i].slurm_addr; +#else + slurm_revoke_job_cred (&node_record_table_ptr[i], revoke_job_cred); +#endif node_record_table_ptr[i].node_state = NODE_STATE_IDLE; bit_set (idle_node_bitmap, i); } + +#if AGENT_TEST + agent_args->msg_args = revoke_job_cred; + debug ("Spawning revoke credential agent"); + if (pthread_attr_init (&attr_agent)) + fatal ("pthread_attr_init error %m"); + if (pthread_attr_setdetachstate (&attr_agent, PTHREAD_CREATE_DETACHED)) + error ("pthread_attr_setdetachstate error %m"); +#ifdef PTHREAD_SCOPE_SYSTEM + if (pthread_attr_setscope (&attr_agent, PTHREAD_SCOPE_SYSTEM)) + error ("pthread_attr_setscope error %m"); +#endif + if (pthread_create (&thread_agent, &attr_agent, agent, (void *)agent_args)) { + error ("pthread_create error %m"); + sleep (1); /* sleep and try once more */ + if (pthread_create (&thread_agent, &attr_agent, agent, (void *)agent_args)) + fatal ("pthread_create error %m"); + } +#else + xfree (revoke_job_cred); +#endif return; } -- GitLab