Skip to content
Snippets Groups Projects
Commit 36d3104c authored by Moe Jette's avatar Moe Jette
Browse files

State non-responding state mapped to DOWN.

enhanced debugging added for agent, now passing node name along with address.
parent 8157d886
No related branches found
No related tags found
No related merge requests found
......@@ -54,6 +54,7 @@
#include <errno.h>
#include <pthread.h>
#include <signal.h>
#include <string.h>
#include <unistd.h>
#include <src/common/log.h>
......@@ -61,6 +62,7 @@
#include <src/common/xmalloc.h>
#include <src/common/xstring.h>
#include <src/slurmctld/agent.h>
#include <src/slurmctld/locks.h>
#if COMMAND_TIMEOUT == 1
#define WDOG_POLL 1 /* secs */
......@@ -76,6 +78,7 @@ typedef struct thd {
state_t state; /* thread state */
time_t time; /* time stamp for start or delta time */
struct sockaddr_in slurm_addr; /* network address */
char node_name[MAX_NAME_LEN];/* node's name */
} thd_t;
typedef struct agent_info {
......@@ -89,12 +92,12 @@ typedef struct agent_info {
} agent_info_t;
typedef struct task_info {
pthread_mutex_t *thread_mutex; /* agent specific mutex */
pthread_cond_t *thread_cond; /* agent specific condition */
uint32_t *threads_active; /* count of currently active threads */
thd_t *thread_struct; /* thread structures */
pthread_mutex_t *thread_mutex_ptr; /* pointer to agent specific mutex */
pthread_cond_t *thread_cond_ptr; /* pointer to agent specific condition */
uint32_t *threads_active_ptr; /* pointer to count of currently active threads */
thd_t *thread_struct_ptr; /* pointer to thread structures */
slurm_msg_type_t msg_type; /* RPC to be issued */
void *msg_args; /* RPC data to be used */
void *msg_args_ptr; /* pointer to RPC data to be used */
} task_info_t;
static void *thread_per_node_rpc (void *args);
......@@ -103,8 +106,8 @@ static void *wdog (void *args);
/*
* agent - party responsible for transmitting an common RPC in parallel across a set
* of nodes
* input: pointer to agent_arg_t, which is xfree'd (including slurm_addr and msg_args)
* upon completion if AGENT_IS_THREAD is set
* input: pointer to agent_arg_t, which is xfree'd (including slurm_addr, node_names,
* and msg_args) upon completion if AGENT_IS_THREAD is set
*/
void *
agent (void *args)
......@@ -124,6 +127,8 @@ agent (void *args)
goto cleanup; /* no messages to be sent */
if (agent_arg_ptr->slurm_addr == NULL)
fatal ("agent passed NULL address list");
if (agent_arg_ptr->node_names == NULL)
fatal ("agent passed NULL node name list");
if (agent_arg_ptr->msg_type != REQUEST_REVOKE_JOB_CREDENTIAL)
fatal ("agent passed invalid message type %d", agent_arg_ptr->msg_type);
......@@ -142,6 +147,8 @@ agent (void *args)
for (i = 0; i < agent_info_ptr->thread_count; i++) {
thread_ptr[i].state = DSH_NEW;
thread_ptr[i].slurm_addr = agent_arg_ptr->slurm_addr[i];
strncpy (thread_ptr[i].node_name,
&agent_arg_ptr->node_names[i*MAX_NAME_LEN], MAX_NAME_LEN);
}
/* start the watchdog thread */
......@@ -173,14 +180,14 @@ agent (void *args)
&agent_info_ptr->thread_mutex);
}
/* create thread */
/* create thread, note this is freed from thread_per_node_rpc() */
task_specific_ptr = xmalloc (sizeof (task_info_t));
task_specific_ptr->thread_mutex = &agent_info_ptr->thread_mutex;
task_specific_ptr->thread_cond = &agent_info_ptr->thread_cond;
task_specific_ptr->threads_active = &agent_info_ptr->threads_active;
task_specific_ptr->thread_struct = &thread_ptr[i];
task_specific_ptr->thread_mutex_ptr = &agent_info_ptr->thread_mutex;
task_specific_ptr->thread_cond_ptr = &agent_info_ptr->thread_cond;
task_specific_ptr->threads_active_ptr = &agent_info_ptr->threads_active;
task_specific_ptr->thread_struct_ptr = &thread_ptr[i];
task_specific_ptr->msg_type = agent_info_ptr->msg_type;
task_specific_ptr->msg_args = &agent_info_ptr->msg_args;
task_specific_ptr->msg_args_ptr = &agent_info_ptr->msg_args;
if (pthread_attr_init (&thread_ptr[i].attr))
fatal ("pthread_attr_init error %m");
......@@ -217,6 +224,8 @@ cleanup:
if (agent_arg_ptr) {
if (agent_arg_ptr->slurm_addr)
xfree (agent_arg_ptr->slurm_addr);
if (agent_arg_ptr->node_names)
xfree (agent_arg_ptr->node_names);
if (agent_arg_ptr->msg_args)
xfree (agent_arg_ptr->msg_args);
xfree (agent_arg_ptr);
......@@ -240,6 +249,12 @@ wdog (void *args)
int i, fail_cnt, work_done, delay, max_delay = 0;
agent_info_t *agent_ptr = (agent_info_t *) args;
thd_t *thread_ptr = agent_ptr->thread_struct;
#if AGENT_IS_THREAD
/* Locks: Write node */
slurmctld_lock_t node_write_lock = { NO_LOCK, NO_LOCK, WRITE_LOCK, NO_LOCK };
#else
struct sockaddr_in *slurm_addr;
#endif
while (1) {
work_done = 1; /* assume all threads complete for now */
......@@ -274,30 +289,37 @@ wdog (void *args)
/* Notify slurmctld of non-responding nodes */
if (fail_cnt) {
struct sockaddr_in *slurm_addr;
#if AGENT_IS_THREAD
/* Update node table data for non-responding nodes */
lock_slurmctld (node_write_lock);
for (i = 0; i < agent_ptr->thread_count; i++) {
if (thread_ptr[i].state == DSH_FAILED)
node_not_resp (thread_ptr[i].node_name);
}
unlock_slurmctld (node_write_lock);
#else
/* Build a list of all non-responding nodes and send it to slurmctld */
error ("agent/wdog: %d nodes failed to respond", fail_cnt);
slurm_addr = xmalloc (fail_cnt * sizeof (struct sockaddr_in));
fail_cnt = 0;
for (i = 0; i < agent_ptr->thread_count; i++) {
if (thread_ptr[i].state != DSH_FAILED)
continue;
/* build a list of slurm_addr's */
slurm_addr[fail_cnt++] = thread_ptr[i].slurm_addr;
xfree (thread_ptr);
if (thread_ptr[i].state == DSH_FAILED)
slurm_addr[fail_cnt++] = thread_ptr[i].slurm_addr;
}
info ("agent/wdog: %d nodes failed to respond", fail_cnt);
/* send RPC */
fatal ("Code development needed here if agent is not thread");
xfree (slurm_addr);
#endif
}
info ("agent maximum delay %d seconds", max_delay);
debug ("agent maximum delay %d seconds", max_delay);
pthread_mutex_unlock (&agent_ptr->thread_mutex);
return (void *) NULL;
}
/* thread_per_node_rpc - thread to revoke a credential on a collection of nodes */
/* thread_per_node_rpc - thread to revoke a credential on a collection of nodes
* This xfrees the argument passed to it */
static void *
thread_per_node_rpc (void *args)
{
......@@ -309,7 +331,7 @@ thread_per_node_rpc (void *args)
slurm_msg_t response_msg ;
return_code_msg_t * slurm_rc_msg ;
task_info_t *task_ptr = (task_info_t *) args;
thd_t *thread_ptr = task_ptr->thread_struct;
thd_t *thread_ptr = task_ptr->thread_struct_ptr;
state_t thread_state = DSH_FAILED;
/* accept SIGALRM */
......@@ -322,10 +344,10 @@ thread_per_node_rpc (void *args)
if (args == NULL)
fatal ("thread_per_node_rpc has NULL argument");
pthread_mutex_lock (task_ptr->thread_mutex);
pthread_mutex_lock (task_ptr->thread_mutex_ptr);
thread_ptr->state = DSH_ACTIVE;
thread_ptr->time = time (NULL);
pthread_mutex_unlock (task_ptr->thread_mutex);
pthread_mutex_unlock (task_ptr->thread_mutex_ptr);
/* init message connection for message communication */
if ( ( sockfd = slurm_open_msg_conn (& thread_ptr->slurm_addr) ) == SLURM_SOCKET_ERROR ) {
......@@ -335,7 +357,7 @@ thread_per_node_rpc (void *args)
/* send request message */
request_msg . msg_type = task_ptr->msg_type ;
request_msg . data = task_ptr->msg_args ;
request_msg . data = task_ptr->msg_args_ptr ;
if ( ( rc = slurm_send_node_msg ( sockfd , & request_msg ) ) == SLURM_SOCKET_ERROR ) {
error ("thread_per_node_rpc/slurm_send_node_msg error %m");
goto cleanup;
......@@ -365,8 +387,11 @@ thread_per_node_rpc (void *args)
slurm_free_return_code_msg ( slurm_rc_msg );
if (rc)
error ("thread_per_node_rpc/rc error %d", rc);
else
else {
debug3 ("agent sucessfully processed RPC to node %s",
thread_ptr->node_name);
thread_state = DSH_DONE;
}
break ;
default:
......@@ -375,16 +400,16 @@ thread_per_node_rpc (void *args)
}
cleanup:
pthread_mutex_lock (task_ptr->thread_mutex);
pthread_mutex_lock (task_ptr->thread_mutex_ptr);
thread_ptr->state = thread_state;
thread_ptr->time = (time_t) difftime (time (NULL), thread_ptr->time);
/* Signal completion so another thread can replace us */
(*task_ptr->threads_active)--;
pthread_cond_signal(task_ptr->thread_cond);
pthread_mutex_unlock (task_ptr->thread_mutex);
(*task_ptr->threads_active_ptr)--;
pthread_cond_signal(task_ptr->thread_cond_ptr);
pthread_mutex_unlock (task_ptr->thread_mutex_ptr);
xfree (task_ptr);
xfree (args);
return (void *) NULL;
}
......@@ -39,6 +39,7 @@
typedef struct agent_arg {
uint32_t addr_count; /* number of network addresses to communicate with */
struct sockaddr_in *slurm_addr; /* array of network addresses */
char *node_names; /* array with MAX_NAME_LEN bytes per node */
slurm_msg_type_t msg_type; /* RPC to be issued */
void *msg_args; /* RPC data to be transmitted */
} agent_arg_t;
......
......@@ -1072,3 +1072,26 @@ validate_node_specs (char *node_name, uint32_t cpus,
return error_code;
}
/* node_not_resp - record that the specified node is not responding */
void
node_not_resp (char *name)
{
struct node_record *node_ptr;
int i;
node_ptr = find_node_record (name);
if (node_ptr == NULL) {
error ("node_not_resp unable to find node %s", name);
return;
}
i = node_ptr - node_record_table_ptr;
last_node_update = time (NULL);
error ("Node %s not responding", name);
bit_clear (up_node_bitmap, i);
bit_clear (idle_node_bitmap, i);
node_record_table_ptr[i].node_state |= NODE_STATE_NO_RESPOND;
return;
}
......@@ -119,6 +119,7 @@ deallocate_nodes (struct job_record * job_ptr)
agent_arg_t *agent_args;
pthread_attr_t attr_agent;
pthread_t thread_agent;
int buf_rec_size = 0;
agent_args = xmalloc (sizeof (agent_arg_t));
agent_args->msg_type = REQUEST_REVOKE_JOB_CREDENTIAL;
......@@ -133,10 +134,15 @@ deallocate_nodes (struct job_record * job_ptr)
if (bit_test (job_ptr->node_bitmap, i) == 0)
continue;
#if AGENT_TEST
xrealloc ((agent_args->slurm_addr),
(sizeof (struct sockaddr_in) * (agent_args->addr_count+1)));
agent_args->slurm_addr[(agent_args->addr_count++)] =
node_record_table_ptr[i].slurm_addr;
if ((agent_args->addr_count+1) > buf_rec_size) {
buf_rec_size += 32;
xrealloc ((agent_args->slurm_addr), (sizeof (struct sockaddr_in) * buf_rec_size));
xrealloc ((agent_args->node_names), (MAX_NAME_LEN * buf_rec_size));
}
agent_args->slurm_addr[agent_args->addr_count] = node_record_table_ptr[i].slurm_addr;
strncpy (&agent_args->node_names[MAX_NAME_LEN*agent_args->addr_count],
node_record_table_ptr[i].name, MAX_NAME_LEN);
agent_args->addr_count++;
#else
slurm_revoke_job_cred (&node_record_table_ptr[i], revoke_job_cred);
#endif
......
......@@ -396,6 +396,9 @@ int mkdir2 (char * path, int modes);
/* node_name2bitmap - given a node name regular expression, build a bitmap representation */
extern int node_name2bitmap (char *node_names, bitstr_t **bitmap);
/* node_not_resp - record that the specified node is not responding */
extern void node_not_resp (char *name);
/*
* pack_all_jobs - dump all job information for all jobs in
* machine independent form (for network transmission)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment