Skip to content
Snippets Groups Projects
Commit f831aa9b authored by Moe Jette's avatar Moe Jette
Browse files

General code clean-up. No changes to logic.

parent a9f4ad05
No related branches found
No related tags found
No related merge requests found
......@@ -39,12 +39,12 @@
*
* The main agent thread creates a separate thread for each node to be
* communicated with up to AGENT_THREAD_COUNT. A special watchdog thread
* sendsSIGLARM to any threads that have been active (in DSH_ACTIVE state)
* sends SIGLARM to any threads that have been active (in DSH_ACTIVE state)
* for more than COMMAND_TIMEOUT seconds.
* The agent responds to slurmctld via an RPC as required.
* The agent responds to slurmctld via a function call or an RPC as required.
* For example, informing slurmctld that some node is not responding.
*
* All the state for each thread is maintailed in thd_t struct, which is
* All the state for each thread is maintained in thd_t struct, which is
* used by the watchdog thread as well as the communication threads.
\*****************************************************************************/
......@@ -93,6 +93,7 @@ typedef struct agent_info {
uint32_t threads_active; /* currently active threads */
uint16_t retry; /* if set, keep trying */
thd_t *thread_struct; /* thread structures */
bool get_reply; /* flag if reply expected */
slurm_msg_type_t msg_type; /* RPC to be issued */
void **msg_args_pptr; /* RPC data to be used */
} agent_info_t;
......@@ -104,12 +105,15 @@ typedef struct task_info {
* condition */
uint32_t *threads_active_ptr; /* currently active thread ptr */
thd_t *thread_struct_ptr; /* thread structures ptr */
bool get_reply; /* flag if reply expected */
slurm_msg_type_t msg_type; /* RPC to be issued */
void *msg_args_ptr; /* ptr to RPC data to be used */
} task_info_t;
static void _alarm_handler(int dummy);
static void _list_delete_retry(void *retry_entry);
static agent_info_t *_make_agent_info(agent_arg_t *agent_arg_ptr);
static task_info_t *_make_task_data(agent_info_t *agent_info_ptr, int inx);
static void _queue_agent_retry(agent_info_t * agent_info_ptr, int count);
static void _slurmctld_free_job_launch_msg(batch_job_launch_msg_t * msg);
static void _spawn_retry_agent(agent_arg_t * agent_arg_ptr);
......@@ -142,25 +146,9 @@ void *agent(void *args)
if (_valid_agent_arg(agent_arg_ptr))
goto cleanup;
/* initialize the data structures */
agent_info_ptr = xmalloc(sizeof(agent_info_t));
slurm_mutex_init(&agent_info_ptr->thread_mutex);
if (pthread_cond_init(&agent_info_ptr->thread_cond, NULL))
fatal("pthread_cond_init error %m");
agent_info_ptr->thread_count = agent_arg_ptr->node_count;
agent_info_ptr->retry = agent_arg_ptr->retry;
agent_info_ptr->threads_active = 0;
thread_ptr = xmalloc(agent_arg_ptr->node_count * sizeof(thd_t));
agent_info_ptr->thread_struct = thread_ptr;
agent_info_ptr->msg_type = agent_arg_ptr->msg_type;
agent_info_ptr->msg_args_pptr = &agent_arg_ptr->msg_args;
for (i = 0; i < agent_info_ptr->thread_count; i++) {
thread_ptr[i].state = DSH_NEW;
thread_ptr[i].slurm_addr = agent_arg_ptr->slurm_addr[i];
strncpy(thread_ptr[i].node_name,
&agent_arg_ptr->node_names[i * MAX_NAME_LEN],
MAX_NAME_LEN);
}
/* initialize the agent data structures */
agent_info_ptr = _make_agent_info(agent_arg_ptr);
thread_ptr = agent_info_ptr->thread_struct;
/* start the watchdog thread */
if (pthread_attr_init(&attr_wdog))
......@@ -193,19 +181,9 @@ void *agent(void *args)
&agent_info_ptr->thread_mutex);
}
/* create thread specific data, NOTE freed from
/* create thread specific data, NOTE: freed from
* _thread_per_node_rpc() */
task_specific_ptr = xmalloc(sizeof(task_info_t));
task_specific_ptr->thread_mutex_ptr =
&agent_info_ptr->thread_mutex;
task_specific_ptr->thread_cond_ptr =
&agent_info_ptr->thread_cond;
task_specific_ptr->threads_active_ptr =
&agent_info_ptr->threads_active;
task_specific_ptr->thread_struct_ptr = &thread_ptr[i];
task_specific_ptr->msg_type = agent_info_ptr->msg_type;
task_specific_ptr->msg_args_ptr =
*agent_info_ptr->msg_args_pptr;
task_specific_ptr = _make_task_data(agent_info_ptr, i);
if (pthread_attr_init(&thread_ptr[i].attr))
fatal("pthread_attr_init error %m");
......@@ -291,6 +269,54 @@ static int _valid_agent_arg(agent_arg_t *agent_arg_ptr)
return SLURM_SUCCESS;
}
static agent_info_t *_make_agent_info(agent_arg_t *agent_arg_ptr)
{
int i;
agent_info_t *agent_info_ptr;
thd_t *thread_ptr;
agent_info_ptr = xmalloc(sizeof(agent_info_t));
slurm_mutex_init(&agent_info_ptr->thread_mutex);
if (pthread_cond_init(&agent_info_ptr->thread_cond, NULL))
fatal("pthread_cond_init error %m");
agent_info_ptr->thread_count = agent_arg_ptr->node_count;
agent_info_ptr->retry = agent_arg_ptr->retry;
agent_info_ptr->threads_active = 0;
thread_ptr = xmalloc(agent_arg_ptr->node_count * sizeof(thd_t));
agent_info_ptr->thread_struct = thread_ptr;
agent_info_ptr->msg_type = agent_arg_ptr->msg_type;
agent_info_ptr->msg_args_pptr = &agent_arg_ptr->msg_args;
if ((agent_arg_ptr->msg_type != REQUEST_SHUTDOWN) &&
(agent_arg_ptr->msg_type != REQUEST_RECONFIGURE))
agent_info_ptr->get_reply = true;
for (i = 0; i < agent_info_ptr->thread_count; i++) {
thread_ptr[i].state = DSH_NEW;
thread_ptr[i].slurm_addr = agent_arg_ptr->slurm_addr[i];
strncpy(thread_ptr[i].node_name,
&agent_arg_ptr->node_names[i * MAX_NAME_LEN],
MAX_NAME_LEN);
}
return agent_info_ptr;
}
static task_info_t *_make_task_data(agent_info_t *agent_info_ptr, int inx)
{
task_info_t *task_info_ptr;
task_info_ptr = xmalloc(sizeof(task_info_t));
task_info_ptr->thread_mutex_ptr = &agent_info_ptr->thread_mutex;
task_info_ptr->thread_cond_ptr = &agent_info_ptr->thread_cond;
task_info_ptr->threads_active_ptr= &agent_info_ptr->threads_active;
task_info_ptr->thread_struct_ptr = &agent_info_ptr->thread_struct[inx];
task_info_ptr->get_reply = agent_info_ptr->get_reply;
task_info_ptr->msg_type = agent_info_ptr->msg_type;
task_info_ptr->msg_args_ptr = *agent_info_ptr->msg_args_pptr;
return task_info_ptr;
}
/*
* _wdog - Watchdog thread. Send SIGALRM to threads which have been active
* for too long.
......@@ -480,7 +506,7 @@ static void *_thread_per_node_rpc(void *args)
}
/* receive message as needed (most message types) */
if ((task_ptr->msg_type != REQUEST_SHUTDOWN) &&
if (task_ptr->get_reply &&
((msg_size = slurm_receive_msg(sockfd, response_msg))
== SLURM_SOCKET_ERROR)) {
error(
......@@ -496,7 +522,7 @@ static void *_thread_per_node_rpc(void *args)
thread_ptr->node_name);
goto cleanup;
}
if (task_ptr->msg_type == REQUEST_SHUTDOWN) {
if (!task_ptr->get_reply) {
thread_state = DSH_DONE;
goto cleanup;
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment