Skip to content
Snippets Groups Projects
Commit ee97c2d6 authored by Danny Auble's avatar Danny Auble
Browse files

moved things around for cleaner code.

parent 6c2b33d8
No related branches found
No related tags found
No related merge requests found
...@@ -92,6 +92,15 @@ typedef enum { ...@@ -92,6 +92,15 @@ typedef enum {
DSH_FAILED /* Request resulted in error */ DSH_FAILED /* Request resulted in error */
} state_t; } state_t;
typedef struct thd_complete {
bool work_done; /* assume all threads complete */
int fail_cnt; /* assume no threads failures */
int no_resp_cnt; /* assume all threads respond */
int retry_cnt; /* assume no required retries */
int max_delay;
time_t now;
} thd_complete_t;
typedef struct thd { typedef struct thd {
pthread_t thread; /* thread ID */ pthread_t thread; /* thread ID */
pthread_attr_t attr; /* thread attributes */ pthread_attr_t attr; /* thread attributes */
...@@ -363,7 +372,7 @@ static agent_info_t *_make_agent_info(agent_arg_t *agent_arg_ptr) ...@@ -363,7 +372,7 @@ static agent_info_t *_make_agent_info(agent_arg_t *agent_arg_ptr)
for (i = 0; i < agent_info_ptr->thread_count; i++) { for (i = 0; i < agent_info_ptr->thread_count; i++) {
thread_ptr[thr_count].state = DSH_NEW; thread_ptr[thr_count].state = DSH_NEW;
thread_ptr[thr_count].slurm_addr = thread_ptr[thr_count].slurm_addr =
agent_arg_ptr->slurm_addr[i]; agent_arg_ptr->slurm_addr[i];
strncpy(thread_ptr[thr_count].node_name, strncpy(thread_ptr[thr_count].node_name,
&agent_arg_ptr->node_names[i * MAX_NAME_LEN], &agent_arg_ptr->node_names[i * MAX_NAME_LEN],
MAX_NAME_LEN); MAX_NAME_LEN);
...@@ -396,6 +405,39 @@ static task_info_t *_make_task_data(agent_info_t *agent_info_ptr, int inx) ...@@ -396,6 +405,39 @@ static task_info_t *_make_task_data(agent_info_t *agent_info_ptr, int inx)
return task_info_ptr; return task_info_ptr;
} }
static void _update_wdog_state(thd_t *thread_ptr,
state_t *state,
thd_complete_t *thd_comp)
{
switch(*state) {
case DSH_ACTIVE:
thd_comp->work_done = false;
if (thread_ptr->end_time <= thd_comp->now) {
debug3("agent thread %lu timed out\n",
(unsigned long)
thread_ptr->thread);
if (pthread_kill(thread_ptr->thread, SIGALRM) == ESRCH)
*state = DSH_NO_RESP;
}
break;
case DSH_NEW:
thd_comp->work_done = false;
break;
case DSH_DONE:
if (thd_comp->max_delay <
(int)thread_ptr->end_time)
thd_comp->max_delay = (int)thread_ptr->end_time;
break;
case DSH_NO_RESP:
thd_comp->no_resp_cnt++;
thd_comp->retry_cnt++;
break;
case DSH_FAILED:
thd_comp->fail_cnt++;
break;
}
}
/* /*
* _wdog - Watchdog thread. Send SIGALRM to threads which have been active * _wdog - Watchdog thread. Send SIGALRM to threads which have been active
* for too long. * for too long.
...@@ -404,9 +446,8 @@ static task_info_t *_make_task_data(agent_info_t *agent_info_ptr, int inx) ...@@ -404,9 +446,8 @@ static task_info_t *_make_task_data(agent_info_t *agent_info_ptr, int inx)
*/ */
static void *_wdog(void *args) static void *_wdog(void *args)
{ {
int fail_cnt, no_resp_cnt, retry_cnt; bool srun_agent = false;
bool work_done, srun_agent = false; int i;
int i, max_delay = 0;
agent_info_t *agent_ptr = (agent_info_t *) args; agent_info_t *agent_ptr = (agent_info_t *) args;
thd_t *thread_ptr = agent_ptr->thread_struct; thd_t *thread_ptr = agent_ptr->thread_struct;
unsigned long usec = 1250000; unsigned long usec = 1250000;
...@@ -415,6 +456,8 @@ static void *_wdog(void *args) ...@@ -415,6 +456,8 @@ static void *_wdog(void *args)
ret_types_t *ret_type = NULL; ret_types_t *ret_type = NULL;
state_t state; state_t state;
int is_ret_list = 1; int is_ret_list = 1;
thd_complete_t thd_comp;
if ( (agent_ptr->msg_type == SRUN_PING) || if ( (agent_ptr->msg_type == SRUN_PING) ||
(agent_ptr->msg_type == SRUN_TIMEOUT) || (agent_ptr->msg_type == SRUN_TIMEOUT) ||
...@@ -422,87 +465,51 @@ static void *_wdog(void *args) ...@@ -422,87 +465,51 @@ static void *_wdog(void *args)
(agent_ptr->msg_type == SRUN_NODE_FAIL) ) (agent_ptr->msg_type == SRUN_NODE_FAIL) )
srun_agent = true; srun_agent = true;
thd_comp.max_delay = 0;
while (1) { while (1) {
work_done = true; /* assume all threads complete */ thd_comp.work_done = true;/* assume all threads complete */
fail_cnt = 0; /* assume no threads failures */ thd_comp.fail_cnt = 0; /* assume no threads failures */
no_resp_cnt = 0; /* assume all threads respond */ thd_comp.no_resp_cnt = 0; /* assume all threads respond */
retry_cnt = 0; /* assume no required retries */ thd_comp.retry_cnt = 0; /* assume no required retries */
thd_comp.now = time(NULL);
usleep(usec); usleep(usec);
usec = MIN((usec * 2), 1000000); usec = MIN((usec * 2), 1000000);
now = time(NULL);
slurm_mutex_lock(&agent_ptr->thread_mutex); slurm_mutex_lock(&agent_ptr->thread_mutex);
for (i = 0; i < agent_ptr->thread_count; i++) { for (i = 0; i < agent_ptr->thread_count; i++) {
if(!thread_ptr[i].ret_list) { if(!thread_ptr[i].ret_list) {
state = thread_ptr[i].state; _update_wdog_state(&thread_ptr[i],
is_ret_list = 0; &thread_ptr[i].state,
goto switch_on_state; &thd_comp);
} } else {
is_ret_list = 1; itr = list_iterator_create(
itr = list_iterator_create(thread_ptr[i].ret_list); thread_ptr[i].ret_list);
while((ret_type = list_next(itr)) != NULL) { while((ret_type = list_next(itr)) != NULL) {
state = ret_type->msg_rc; _update_wdog_state(
switch_on_state: &thread_ptr[i],
switch(state) { (state_t *)&ret_type->msg_rc,
case DSH_ACTIVE: &thd_comp);
work_done = false;
if (thread_ptr[i].end_time <= now) {
debug3("agent thread %lu "
"timed out\n",
(unsigned long)
thread_ptr[i].thread);
if (pthread_kill(thread_ptr[i].
thread,
SIGALRM)
== ESRCH) {
if(is_ret_list)
ret_type->
msg_rc =
DSH_NO_RESP;
else
thread_ptr[i].
state =
DSH_NO_RESP;
}
}
break;
case DSH_NEW:
work_done = false;
break;
case DSH_DONE:
if (max_delay <
(int)thread_ptr[i].end_time)
max_delay =
(int)thread_ptr[i].
end_time;
break;
case DSH_NO_RESP:
no_resp_cnt++;
retry_cnt++;
break;
case DSH_FAILED:
fail_cnt++;
break;
} }
if(!is_ret_list) list_iterator_destroy(itr);
goto is_work_done;
} }
list_iterator_destroy(itr);
} }
is_work_done: if (thd_comp.work_done)
if (work_done)
break; break;
slurm_mutex_unlock(&agent_ptr->thread_mutex); slurm_mutex_unlock(&agent_ptr->thread_mutex);
} }
if (srun_agent) { if (srun_agent) {
_notify_slurmctld_jobs(agent_ptr); _notify_slurmctld_jobs(agent_ptr);
} else { } else {
_notify_slurmctld_nodes(agent_ptr, no_resp_cnt, retry_cnt); _notify_slurmctld_nodes(agent_ptr,
thd_comp.no_resp_cnt,
thd_comp.retry_cnt);
} }
if (max_delay) if (thd_comp.max_delay)
debug2("agent maximum delay %d seconds", max_delay); debug2("agent maximum delay %d seconds", thd_comp.max_delay);
slurm_mutex_unlock(&agent_ptr->thread_mutex); slurm_mutex_unlock(&agent_ptr->thread_mutex);
return (void *) NULL; return (void *) NULL;
...@@ -550,7 +557,7 @@ static void _notify_slurmctld_jobs(agent_info_t *agent_ptr) ...@@ -550,7 +557,7 @@ static void _notify_slurmctld_jobs(agent_info_t *agent_ptr)
} }
static void _notify_slurmctld_nodes(agent_info_t *agent_ptr, static void _notify_slurmctld_nodes(agent_info_t *agent_ptr,
int no_resp_cnt, int retry_cnt) int no_resp_cnt, int retry_cnt)
{ {
ListIterator itr; ListIterator itr;
ListIterator data_itr; ListIterator data_itr;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment