From f831aa9bcc8b413cf626573c36745720daf65ac4 Mon Sep 17 00:00:00 2001 From: Moe Jette <jette1@llnl.gov> Date: Fri, 24 Jan 2003 18:10:29 +0000 Subject: [PATCH] General code clean-up. No changes to logic. --- src/slurmctld/agent.c | 98 +++++++++++++++++++++++++++---------------- 1 file changed, 62 insertions(+), 36 deletions(-) diff --git a/src/slurmctld/agent.c b/src/slurmctld/agent.c index 9bf654b0718..8cfe0b7f2af 100644 --- a/src/slurmctld/agent.c +++ b/src/slurmctld/agent.c @@ -39,12 +39,12 @@ * * The main agent thread creates a separate thread for each node to be * communicated with up to AGENT_THREAD_COUNT. A special watchdog thread - * sendsSIGLARM to any threads that have been active (in DSH_ACTIVE state) + * sends SIGLARM to any threads that have been active (in DSH_ACTIVE state) * for more than COMMAND_TIMEOUT seconds. - * The agent responds to slurmctld via an RPC as required. + * The agent responds to slurmctld via a function call or an RPC as required. * For example, informing slurmctld that some node is not responding. * - * All the state for each thread is maintailed in thd_t struct, which is + * All the state for each thread is maintained in thd_t struct, which is * used by the watchdog thread as well as the communication threads. \*****************************************************************************/ @@ -93,6 +93,7 @@ typedef struct agent_info { uint32_t threads_active; /* currently active threads */ uint16_t retry; /* if set, keep trying */ thd_t *thread_struct; /* thread structures */ + bool get_reply; /* flag if reply expected */ slurm_msg_type_t msg_type; /* RPC to be issued */ void **msg_args_pptr; /* RPC data to be used */ } agent_info_t; @@ -104,12 +105,15 @@ typedef struct task_info { * condition */ uint32_t *threads_active_ptr; /* currently active thread ptr */ thd_t *thread_struct_ptr; /* thread structures ptr */ + bool get_reply; /* flag if reply expected */ slurm_msg_type_t msg_type; /* RPC to be issued */ void *msg_args_ptr; /* ptr to RPC data to be used */ } task_info_t; static void _alarm_handler(int dummy); static void _list_delete_retry(void *retry_entry); +static agent_info_t *_make_agent_info(agent_arg_t *agent_arg_ptr); +static task_info_t *_make_task_data(agent_info_t *agent_info_ptr, int inx); static void _queue_agent_retry(agent_info_t * agent_info_ptr, int count); static void _slurmctld_free_job_launch_msg(batch_job_launch_msg_t * msg); static void _spawn_retry_agent(agent_arg_t * agent_arg_ptr); @@ -142,25 +146,9 @@ void *agent(void *args) if (_valid_agent_arg(agent_arg_ptr)) goto cleanup; - /* initialize the data structures */ - agent_info_ptr = xmalloc(sizeof(agent_info_t)); - slurm_mutex_init(&agent_info_ptr->thread_mutex); - if (pthread_cond_init(&agent_info_ptr->thread_cond, NULL)) - fatal("pthread_cond_init error %m"); - agent_info_ptr->thread_count = agent_arg_ptr->node_count; - agent_info_ptr->retry = agent_arg_ptr->retry; - agent_info_ptr->threads_active = 0; - thread_ptr = xmalloc(agent_arg_ptr->node_count * sizeof(thd_t)); - agent_info_ptr->thread_struct = thread_ptr; - agent_info_ptr->msg_type = agent_arg_ptr->msg_type; - agent_info_ptr->msg_args_pptr = &agent_arg_ptr->msg_args; - for (i = 0; i < agent_info_ptr->thread_count; i++) { - thread_ptr[i].state = DSH_NEW; - thread_ptr[i].slurm_addr = agent_arg_ptr->slurm_addr[i]; - strncpy(thread_ptr[i].node_name, - &agent_arg_ptr->node_names[i * MAX_NAME_LEN], - MAX_NAME_LEN); - } + /* initialize the agent data structures */ + agent_info_ptr = _make_agent_info(agent_arg_ptr); + thread_ptr = agent_info_ptr->thread_struct; /* start the watchdog thread */ if (pthread_attr_init(&attr_wdog)) @@ -193,19 +181,9 @@ void *agent(void *args) &agent_info_ptr->thread_mutex); } - /* create thread specific data, NOTE freed from + /* create thread specific data, NOTE: freed from * _thread_per_node_rpc() */ - task_specific_ptr = xmalloc(sizeof(task_info_t)); - task_specific_ptr->thread_mutex_ptr = - &agent_info_ptr->thread_mutex; - task_specific_ptr->thread_cond_ptr = - &agent_info_ptr->thread_cond; - task_specific_ptr->threads_active_ptr = - &agent_info_ptr->threads_active; - task_specific_ptr->thread_struct_ptr = &thread_ptr[i]; - task_specific_ptr->msg_type = agent_info_ptr->msg_type; - task_specific_ptr->msg_args_ptr = - *agent_info_ptr->msg_args_pptr; + task_specific_ptr = _make_task_data(agent_info_ptr, i); if (pthread_attr_init(&thread_ptr[i].attr)) fatal("pthread_attr_init error %m"); @@ -291,6 +269,54 @@ static int _valid_agent_arg(agent_arg_t *agent_arg_ptr) return SLURM_SUCCESS; } +static agent_info_t *_make_agent_info(agent_arg_t *agent_arg_ptr) +{ + int i; + agent_info_t *agent_info_ptr; + thd_t *thread_ptr; + + agent_info_ptr = xmalloc(sizeof(agent_info_t)); + + slurm_mutex_init(&agent_info_ptr->thread_mutex); + if (pthread_cond_init(&agent_info_ptr->thread_cond, NULL)) + fatal("pthread_cond_init error %m"); + agent_info_ptr->thread_count = agent_arg_ptr->node_count; + agent_info_ptr->retry = agent_arg_ptr->retry; + agent_info_ptr->threads_active = 0; + thread_ptr = xmalloc(agent_arg_ptr->node_count * sizeof(thd_t)); + agent_info_ptr->thread_struct = thread_ptr; + agent_info_ptr->msg_type = agent_arg_ptr->msg_type; + agent_info_ptr->msg_args_pptr = &agent_arg_ptr->msg_args; + if ((agent_arg_ptr->msg_type != REQUEST_SHUTDOWN) && + (agent_arg_ptr->msg_type != REQUEST_RECONFIGURE)) + agent_info_ptr->get_reply = true; + for (i = 0; i < agent_info_ptr->thread_count; i++) { + thread_ptr[i].state = DSH_NEW; + thread_ptr[i].slurm_addr = agent_arg_ptr->slurm_addr[i]; + strncpy(thread_ptr[i].node_name, + &agent_arg_ptr->node_names[i * MAX_NAME_LEN], + MAX_NAME_LEN); + } + + return agent_info_ptr; +} + +static task_info_t *_make_task_data(agent_info_t *agent_info_ptr, int inx) +{ + task_info_t *task_info_ptr; + task_info_ptr = xmalloc(sizeof(task_info_t)); + + task_info_ptr->thread_mutex_ptr = &agent_info_ptr->thread_mutex; + task_info_ptr->thread_cond_ptr = &agent_info_ptr->thread_cond; + task_info_ptr->threads_active_ptr= &agent_info_ptr->threads_active; + task_info_ptr->thread_struct_ptr = &agent_info_ptr->thread_struct[inx]; + task_info_ptr->get_reply = agent_info_ptr->get_reply; + task_info_ptr->msg_type = agent_info_ptr->msg_type; + task_info_ptr->msg_args_ptr = *agent_info_ptr->msg_args_pptr; + + return task_info_ptr; +} + /* * _wdog - Watchdog thread. Send SIGALRM to threads which have been active * for too long. @@ -480,7 +506,7 @@ static void *_thread_per_node_rpc(void *args) } /* receive message as needed (most message types) */ - if ((task_ptr->msg_type != REQUEST_SHUTDOWN) && + if (task_ptr->get_reply && ((msg_size = slurm_receive_msg(sockfd, response_msg)) == SLURM_SOCKET_ERROR)) { error( @@ -496,7 +522,7 @@ static void *_thread_per_node_rpc(void *args) thread_ptr->node_name); goto cleanup; } - if (task_ptr->msg_type == REQUEST_SHUTDOWN) { + if (!task_ptr->get_reply) { thread_state = DSH_DONE; goto cleanup; } -- GitLab