From 69de51d9aa5a04fc093a92c61ddcc6e43e1a7364 Mon Sep 17 00:00:00 2001 From: Moe Jette <jette1@llnl.gov> Date: Sat, 16 Nov 2002 01:05:48 +0000 Subject: [PATCH] General code clean-up --- src/slurmctld/agent.c | 181 +++++++++++++++++++++++------------------- 1 file changed, 101 insertions(+), 80 deletions(-) diff --git a/src/slurmctld/agent.c b/src/slurmctld/agent.c index 2daeee237d0..4eea080a57e 100644 --- a/src/slurmctld/agent.c +++ b/src/slurmctld/agent.c @@ -77,7 +77,8 @@ typedef struct thd { pthread_t thread; /* thread ID */ pthread_attr_t attr; /* thread attributes */ state_t state; /* thread state */ - time_t time; /* time stamp for start or delta time */ + time_t time; /* start time or delta time + * at termination */ struct sockaddr_in slurm_addr; /* network address */ char node_name[MAX_NAME_LEN];/* node's name */ } thd_t; @@ -86,7 +87,7 @@ typedef struct agent_info { pthread_mutex_t thread_mutex; /* agent specific mutex */ pthread_cond_t thread_cond; /* agent specific condition */ uint32_t thread_count; /* number of threads records */ - uint32_t threads_active; /* count of currently active threads */ + uint32_t threads_active; /* currently active threads */ uint16_t retry; /* if set, keep trying */ thd_t *thread_struct; /* thread structures */ slurm_msg_type_t msg_type; /* RPC to be issued */ @@ -94,32 +95,34 @@ typedef struct agent_info { } agent_info_t; typedef struct task_info { - pthread_mutex_t *thread_mutex_ptr; /* pointer to agent specific mutex */ - pthread_cond_t *thread_cond_ptr; /* pointer to agent specific condition */ - uint32_t *threads_active_ptr; /* pointer to count of currently active threads */ - thd_t *thread_struct_ptr; /* pointer to thread structures */ + pthread_mutex_t *thread_mutex_ptr; /* pointer to agent specific + * mutex */ + pthread_cond_t *thread_cond_ptr; /* pointer to agent specific + * condition */ + uint32_t *threads_active_ptr; /* currently active thread ptr */ + thd_t *thread_struct_ptr; /* thread structures ptr */ slurm_msg_type_t msg_type; /* RPC to be issued */ - void *msg_args_ptr; /* pointer to RPC data to be used */ + void *msg_args_ptr; /* ptr to RPC data to be used */ } task_info_t; -static void alarm_handler(int dummy); -static void list_delete_retry (void *retry_entry); -static void queue_agent_retry (agent_info_t *agent_info_ptr, int count); -static void slurmctld_free_job_launch_msg(batch_job_launch_msg_t * msg); -static void spawn_retry_agent (agent_arg_t *agent_arg_ptr); -static void *thread_per_node_rpc (void *args); -static void *wdog (void *args); -static void xsignal(int signal, void (*handler)(int)); +static void _alarm_handler(int dummy); +static void _list_delete_retry (void *retry_entry); +static void _queue_agent_retry (agent_info_t *agent_info_ptr, int count); +static void _slurmctld_free_job_launch_msg(batch_job_launch_msg_t * msg); +static void _spawn_retry_agent (agent_arg_t *agent_arg_ptr); +static void *_thread_per_node_rpc (void *args); +static void *_wdog (void *args); +static void _xsignal(int signal, void (*handler)(int)); /* retry RPC data structures */ pthread_mutex_t retry_mutex = PTHREAD_MUTEX_INITIALIZER; List retry_list = NULL; /* agent_arg_t list for retry */ /* - * agent - party responsible for transmitting an common RPC in parallel across a set - * of nodes - * input: pointer to agent_arg_t, which is xfree'd (including slurm_addr, node_names, - * and msg_args) upon completion if AGENT_IS_THREAD is set + * agent - party responsible for transmitting an common RPC in parallel + * across a set of nodes + * input: pointer to agent_arg_t, which is xfree'd (including slurm_addr, + * node_names and msg_args) upon completion if AGENT_IS_THREAD is set */ void * agent (void *args) @@ -145,7 +148,8 @@ agent (void *args) (agent_arg_ptr->msg_type != REQUEST_NODE_REGISTRATION_STATUS) && (agent_arg_ptr->msg_type != REQUEST_PING) && (agent_arg_ptr->msg_type != REQUEST_BATCH_JOB_LAUNCH)) - fatal ("agent passed invalid message type %d", agent_arg_ptr->msg_type); + fatal ("agent passed invalid message type %d", + agent_arg_ptr->msg_type); /* initialize the data structures */ agent_info_ptr = xmalloc (sizeof (agent_info_t)); @@ -164,7 +168,8 @@ agent (void *args) thread_ptr[i].state = DSH_NEW; thread_ptr[i].slurm_addr = agent_arg_ptr->slurm_addr[i]; strncpy (thread_ptr[i].node_name, - &agent_arg_ptr->node_names[i*MAX_NAME_LEN], MAX_NAME_LEN); + &agent_arg_ptr->node_names[i*MAX_NAME_LEN], + MAX_NAME_LEN); } /* start the watchdog thread */ @@ -176,17 +181,18 @@ agent (void *args) if (pthread_attr_setscope (&attr_wdog, PTHREAD_SCOPE_SYSTEM)) error ("pthread_attr_setscope error %m"); #endif - if (pthread_create (&thread_wdog, &attr_wdog, wdog, (void *)agent_info_ptr)) { + if (pthread_create (&thread_wdog, &attr_wdog, _wdog, + (void *)agent_info_ptr)) { error ("pthread_create error %m"); sleep (1); /* sleep and try once more */ - if (pthread_create (&thread_wdog, &attr_wdog, wdog, args)) + if (pthread_create (&thread_wdog, &attr_wdog, _wdog, args)) fatal ("pthread_create error %m"); } #if AGENT_THREAD_COUNT < 1 fatal ("AGENT_THREAD_COUNT value is invalid"); #endif - /* start all the other threads (up to AGENT_THREAD_COUNT active at once) */ + /* start all the other threads (up to AGENT_THREAD_COUNT active) */ for (i = 0; i < agent_info_ptr->thread_count; i++) { /* wait until "room" for another thread */ @@ -196,26 +202,35 @@ agent (void *args) &agent_info_ptr->thread_mutex); } - /* create thread specific data, NOTE freed from thread_per_node_rpc() */ - task_specific_ptr = xmalloc (sizeof (task_info_t)); - task_specific_ptr->thread_mutex_ptr = &agent_info_ptr->thread_mutex; - task_specific_ptr->thread_cond_ptr = &agent_info_ptr->thread_cond; - task_specific_ptr->threads_active_ptr = &agent_info_ptr->threads_active; + /* create thread specific data, NOTE freed from + * _thread_per_node_rpc() */ + task_specific_ptr = + xmalloc (sizeof (task_info_t)); + task_specific_ptr->thread_mutex_ptr = + &agent_info_ptr->thread_mutex; + task_specific_ptr->thread_cond_ptr = + &agent_info_ptr->thread_cond; + task_specific_ptr->threads_active_ptr = + &agent_info_ptr->threads_active; task_specific_ptr->thread_struct_ptr = &thread_ptr[i]; - task_specific_ptr->msg_type = agent_info_ptr->msg_type; - task_specific_ptr->msg_args_ptr = *agent_info_ptr->msg_args_pptr; + task_specific_ptr->msg_type = + agent_info_ptr->msg_type; + task_specific_ptr->msg_args_ptr = + *agent_info_ptr->msg_args_pptr; if (pthread_attr_init (&thread_ptr[i].attr)) fatal ("pthread_attr_init error %m"); - if (pthread_attr_setdetachstate (&thread_ptr[i].attr, PTHREAD_CREATE_DETACHED)) + if (pthread_attr_setdetachstate (&thread_ptr[i].attr, + PTHREAD_CREATE_DETACHED)) error ("pthread_attr_setdetachstate error %m"); #ifdef PTHREAD_SCOPE_SYSTEM - if (pthread_attr_setscope (&thread_ptr[i].attr, PTHREAD_SCOPE_SYSTEM)) + if (pthread_attr_setscope (&thread_ptr[i].attr, + PTHREAD_SCOPE_SYSTEM)) error ("pthread_attr_setscope error %m"); #endif while ( (rc = pthread_create (&thread_ptr[i].thread, &thread_ptr[i].attr, - thread_per_node_rpc, + _thread_per_node_rpc, (void *) task_specific_ptr)) ) { error ("pthread_create error %m"); if (agent_info_ptr->threads_active) @@ -244,7 +259,7 @@ cleanup: xfree (agent_arg_ptr->node_names); if (agent_arg_ptr->msg_args) { if (agent_arg_ptr->msg_type == REQUEST_BATCH_JOB_LAUNCH) - slurmctld_free_job_launch_msg (agent_arg_ptr->msg_args); + _slurmctld_free_job_launch_msg (agent_arg_ptr->msg_args); else xfree (agent_arg_ptr->msg_args); } @@ -261,11 +276,11 @@ cleanup: } /* - * wdog - Watchdog thread. Send SIGALRM to threads which have been active for too long. + * _wdog - Watchdog thread. Send SIGALRM to threads which have been active for too long. * Sleep for WDOG_POLL seconds between polls. */ static void * -wdog (void *args) +_wdog (void *args) { int i, fail_cnt, work_done, delay, max_delay = 0; agent_info_t *agent_ptr = (agent_info_t *) args; @@ -288,9 +303,11 @@ wdog (void *args) switch (thread_ptr[i].state) { case DSH_ACTIVE: work_done = 0; - delay = difftime (time (NULL), thread_ptr[i].time); + delay = difftime (time (NULL), + thread_ptr[i].time); if ( delay >= COMMAND_TIMEOUT) - pthread_kill(thread_ptr[i].thread, SIGALRM); + pthread_kill(thread_ptr[i].thread, + SIGALRM); break; case DSH_NEW: work_done = 0; @@ -320,14 +337,15 @@ wdog (void *args) } unlock_slurmctld (node_write_lock); #else - /* Build a list of all non-responding nodes and send it to slurmctld */ + /* Build a list of all non-responding nodes and send + * it to slurmctld */ slurm_names = xmalloc (fail_cnt * MAX_NAME_LEN); fail_cnt = 0; for (i = 0; i < agent_ptr->thread_count; i++) { if (thread_ptr[i].state == DSH_FAILED) { strncpy (&slurm_names[MAX_NAME_LEN * fail_cnt], thread_ptr[i].node_name, MAX_NAME_LEN); - error ("agent/wdog: node %s failed to respond", + error ("agent/_wdog: node %s failed to respond", thread_ptr[i].node_name); fail_cnt++; } @@ -339,7 +357,7 @@ wdog (void *args) xfree (slurm_names); #endif if (agent_ptr->retry) - queue_agent_retry (agent_ptr, fail_cnt); + _queue_agent_retry (agent_ptr, fail_cnt); } #if AGENT_IS_THREAD @@ -351,7 +369,8 @@ wdog (void *args) } unlock_slurmctld (node_write_lock); #else - /* Build a list of all responding nodes and send it to slurmctld to update time stamps */ + /* Build a list of all responding nodes and send it to slurmctld to + * update time stamps */ done_cnt = agent_ptr->thread_count - fail_cnt; slurm_names = xmalloc (done_cnt * MAX_NAME_LEN); done_cnt = 0; @@ -375,10 +394,10 @@ wdog (void *args) return (void *) NULL; } -/* thread_per_node_rpc - thread to revoke a credential on a collection of nodes +/* _thread_per_node_rpc - thread to revoke a credential on a collection of nodes * This xfrees the argument passed to it */ static void * -thread_per_node_rpc (void *args) +_thread_per_node_rpc (void *args) { int msg_size ; int rc ; @@ -398,42 +417,45 @@ thread_per_node_rpc (void *args) error ("sigaddset error on SIGALRM: %m"); if (sigprocmask (SIG_UNBLOCK, &set, NULL) != 0) fatal ("sigprocmask error: %m"); - xsignal(SIGALRM, alarm_handler); + _xsignal(SIGALRM, _alarm_handler); if (args == NULL) - fatal ("thread_per_node_rpc has NULL argument"); + fatal ("_thread_per_node_rpc has NULL argument"); pthread_mutex_lock (task_ptr->thread_mutex_ptr); thread_ptr->state = DSH_ACTIVE; thread_ptr->time = time (NULL); pthread_mutex_unlock (task_ptr->thread_mutex_ptr); /* init message connection for message communication */ - if ( ( sockfd = slurm_open_msg_conn (& thread_ptr->slurm_addr) ) == SLURM_SOCKET_ERROR ) { - error ("thread_per_node_rpc/slurm_open_msg_conn error %m"); + if ( ( sockfd = slurm_open_msg_conn (& thread_ptr->slurm_addr) ) + == SLURM_SOCKET_ERROR ) { + error ("_thread_per_node_rpc/slurm_open_msg_conn error %m"); goto cleanup; } /* send request message */ request_msg . msg_type = task_ptr->msg_type ; request_msg . data = task_ptr->msg_args_ptr ; - if ( ( rc = slurm_send_node_msg ( sockfd , & request_msg ) ) == SLURM_SOCKET_ERROR ) { - error ("thread_per_node_rpc/slurm_send_node_msg error %m"); + if ( ( rc = slurm_send_node_msg ( sockfd , & request_msg ) ) + == SLURM_SOCKET_ERROR ) { + error ("_thread_per_node_rpc/slurm_send_node_msg error %m"); goto cleanup; } /* receive message */ - if ( ( msg_size = slurm_receive_msg ( sockfd , & response_msg ) ) == SLURM_SOCKET_ERROR ) { - error ("thread_per_node_rpc/slurm_receive_msg error %m"); + if ( ( msg_size = slurm_receive_msg ( sockfd , & response_msg ) ) + == SLURM_SOCKET_ERROR ) { + error ("_thread_per_node_rpc/slurm_receive_msg error %m"); goto cleanup; } /* shutdown message connection */ if ( ( rc = slurm_shutdown_msg_conn ( sockfd ) ) == SLURM_SOCKET_ERROR ) { - error ("thread_per_node_rpc/slurm_shutdown_msg_conn error %m"); + error ("_thread_per_node_rpc/slurm_shutdown_msg_conn error %m"); goto cleanup; } if ( msg_size ) { - error ("thread_per_node_rpc/msg_size error %d", msg_size); + error ("_thread_per_node_rpc/msg_size error %d", msg_size); goto cleanup; } @@ -444,8 +466,8 @@ thread_per_node_rpc (void *args) rc = slurm_rc_msg->return_code; slurm_free_return_code_msg ( slurm_rc_msg ); if (rc) - error ("thread_per_node_rpc/rc error %s", - slurm_strerror (rc)); + error ("_thread_per_node_rpc/rc error %s", + slurm_strerror (rc)); /* Don't use %m */ else { debug3 ("agent sucessfully processed RPC to node %s", thread_ptr->node_name); @@ -453,7 +475,8 @@ thread_per_node_rpc (void *args) thread_state = DSH_DONE; break ; default: - error ("thread_per_node_rpc bad msg_type %d",response_msg.msg_type); + error ("_thread_per_node_rpc bad msg_type %d", + response_msg.msg_type); break ; } @@ -475,7 +498,7 @@ cleanup: * Emulate signal() but with BSD semantics (i.e. don't restore signal to * SIGDFL prior to executing handler). */ -static void xsignal(int signal, void (*handler)(int)) +static void _xsignal(int signal, void (*handler)(int)) { struct sigaction sa, old_sa; @@ -491,14 +514,14 @@ static void xsignal(int signal, void (*handler)(int)) * in interrupting connect() in k4cmd/rcmd or select() in rsh() below and * causing them to return EINTR. */ -static void alarm_handler(int dummy) +static void _alarm_handler(int dummy) { } -/* queue_agent_retry - Queue any failed RPCs for later replay */ -void -queue_agent_retry (agent_info_t *agent_info_ptr, int count) +/* _queue_agent_retry - Queue any failed RPCs for later replay */ +static void +_queue_agent_retry (agent_info_t *agent_info_ptr, int count) { agent_arg_t *agent_arg_ptr; thd_t *thread_ptr = agent_info_ptr->thread_struct; @@ -511,7 +534,8 @@ queue_agent_retry (agent_info_t *agent_info_ptr, int count) agent_arg_ptr = xmalloc (sizeof (agent_arg_t)); agent_arg_ptr -> node_count = count; agent_arg_ptr -> retry = 1; - agent_arg_ptr -> slurm_addr = xmalloc (sizeof (struct sockaddr_in) * count); + agent_arg_ptr -> slurm_addr = xmalloc (sizeof (struct sockaddr_in) + * count); agent_arg_ptr -> node_names = xmalloc (MAX_NAME_LEN * count); agent_arg_ptr -> msg_type = agent_info_ptr -> msg_type; agent_arg_ptr -> msg_args = *(agent_info_ptr -> msg_args_pptr); @@ -531,7 +555,7 @@ queue_agent_retry (agent_info_t *agent_info_ptr, int count) /* add the requeust to a list */ pthread_mutex_lock (&retry_mutex); if (retry_list == NULL) { - retry_list = list_create (&list_delete_retry); + retry_list = list_create (&_list_delete_retry); if (retry_list == NULL) fatal ("list_create failed"); } @@ -541,11 +565,10 @@ queue_agent_retry (agent_info_t *agent_info_ptr, int count) } /* - * list_delete_retry - delete an entry from the retry list, + * _list_delete_retry - delete an entry from the retry list, * see common/list.h for documentation */ -void -list_delete_retry (void *retry_entry) +static void _list_delete_retry (void *retry_entry) { agent_arg_t *agent_arg_ptr; /* pointer to part_record */ @@ -564,8 +587,7 @@ list_delete_retry (void *retry_entry) /* agent_retry - Agent for retrying pending RPCs (top one on the queue), * argument is unused */ -void * -agent_retry (void *args) +void * agent_retry (void *args) { agent_arg_t *agent_arg_ptr = NULL; @@ -575,14 +597,13 @@ agent_retry (void *args) pthread_mutex_unlock (&retry_mutex); if (agent_arg_ptr) - spawn_retry_agent (agent_arg_ptr); + _spawn_retry_agent (agent_arg_ptr); return NULL; } /* retry_pending - retry all pending RPCs for the given node name */ -void -retry_pending (char *node_name) +void retry_pending (char *node_name) { int list_size = 0, i, j, found; agent_arg_t *agent_arg_ptr = NULL; @@ -602,16 +623,15 @@ retry_pending (char *node_name) break; } if (found) /* issue this RPC */ - spawn_retry_agent (agent_arg_ptr); + _spawn_retry_agent (agent_arg_ptr); else /* put the RPC back on the queue */ list_enqueue (retry_list, (void*) agent_arg_ptr); } pthread_mutex_unlock (&retry_mutex); } -/* spawn_retry_agent - pthread_crate an agent for the given task */ -void -spawn_retry_agent (agent_arg_t *agent_arg_ptr) +/* _spawn_retry_agent - pthread_crate an agent for the given task */ +static void _spawn_retry_agent (agent_arg_t *agent_arg_ptr) { pthread_attr_t attr_agent; pthread_t thread_agent; @@ -622,7 +642,8 @@ spawn_retry_agent (agent_arg_t *agent_arg_ptr) debug3 ("Spawning RPC retry agent"); if (pthread_attr_init (&attr_agent)) fatal ("pthread_attr_init error %m"); - if (pthread_attr_setdetachstate (&attr_agent, PTHREAD_CREATE_DETACHED)) + if (pthread_attr_setdetachstate (&attr_agent, + PTHREAD_CREATE_DETACHED)) error ("pthread_attr_setdetachstate error %m"); #ifdef PTHREAD_SCOPE_SYSTEM if (pthread_attr_setscope (&attr_agent, PTHREAD_SCOPE_SYSTEM)) @@ -638,12 +659,12 @@ spawn_retry_agent (agent_arg_t *agent_arg_ptr) } } -/* slurmctld_free_job_launch_msg is a variant of slurm_free_job_launch_msg +/* _slurmctld_free_job_launch_msg is a variant of slurm_free_job_launch_msg * because all environment variables currently loaded in one xmalloc * buffer (see get_job_env()), which is different from how slurmd * assembles the data from a message */ -void slurmctld_free_job_launch_msg(batch_job_launch_msg_t * msg) +static void _slurmctld_free_job_launch_msg(batch_job_launch_msg_t * msg) { if (msg) { if (msg->environment) { -- GitLab