diff --git a/src/slurmctld/agent.c b/src/slurmctld/agent.c index 612b1509d54d72da899081ebca6d0a76034790cf..6837952e11a56d69d4f3375ae13fd4a89f9db0da 100644 --- a/src/slurmctld/agent.c +++ b/src/slurmctld/agent.c @@ -104,6 +104,7 @@ typedef struct task_info { static void alarm_handler(int dummy); static void queue_agent_retry (agent_info_t *agent_info_ptr, int count); +static void spawn_retry_agent (agent_arg_t *agent_arg_ptr); static void *thread_per_node_rpc (void *args); static void *wdog (void *args); static void xsignal(int signal, void (*handler)(int)); @@ -132,7 +133,7 @@ agent (void *args) /* basic argument value tests */ if (agent_arg_ptr == NULL) fatal ("agent NULL argument"); - if (agent_arg_ptr->addr_count == 0) + if (agent_arg_ptr->node_count == 0) goto cleanup; /* no messages to be sent */ if (agent_arg_ptr->slurm_addr == NULL) fatal ("agent passed NULL address list"); @@ -149,10 +150,10 @@ agent (void *args) fatal (" pthread_mutex_init error %m"); if (pthread_cond_init (&agent_info_ptr->thread_cond, NULL)) fatal ("pthread_cond_init error %m"); - agent_info_ptr->thread_count = agent_arg_ptr->addr_count; + agent_info_ptr->thread_count = agent_arg_ptr->node_count; agent_info_ptr->retry = agent_arg_ptr->retry; agent_info_ptr->threads_active = 0; - thread_ptr = xmalloc (agent_arg_ptr->addr_count * sizeof (thd_t)); + thread_ptr = xmalloc (agent_arg_ptr->node_count * sizeof (thd_t)); agent_info_ptr->thread_struct = thread_ptr; agent_info_ptr->msg_type = agent_arg_ptr->msg_type; agent_info_ptr->msg_args_pptr = &agent_arg_ptr->msg_args; @@ -501,7 +502,7 @@ queue_agent_retry (agent_info_t *agent_info_ptr, int count) /* build agent argument with just the RPCs to retry */ agent_arg_ptr = xmalloc (sizeof (agent_arg_t)); - agent_arg_ptr -> addr_count = count; + agent_arg_ptr -> node_count = count; agent_arg_ptr -> retry = 1; agent_arg_ptr -> slurm_addr = xmalloc (sizeof (struct sockaddr_in) * count); agent_arg_ptr -> node_names = xmalloc (MAX_NAME_LEN * count); @@ -532,38 +533,78 @@ queue_agent_retry (agent_info_t *agent_info_ptr, int count) pthread_mutex_unlock (&retry_mutex); } -/* Agent for retrying RPCs */ +/* agent_retry - Agent for retrying pending RPCs (top one on the queue), + * argument is unused */ void * agent_retry (void *args) { agent_arg_t *agent_arg_ptr = NULL; - pthread_attr_t attr_agent; - pthread_t thread_agent; pthread_mutex_lock (&retry_mutex); if (retry_list) agent_arg_ptr = (agent_arg_t *) list_dequeue (retry_list); pthread_mutex_unlock (&retry_mutex); - if (agent_arg_ptr) { - debug3 ("Spawning RPC retry agent"); - if (pthread_attr_init (&attr_agent)) - fatal ("pthread_attr_init error %m"); - if (pthread_attr_setdetachstate (&attr_agent, PTHREAD_CREATE_DETACHED)) - error ("pthread_attr_setdetachstate error %m"); + if (agent_arg_ptr) + spawn_retry_agent (agent_arg_ptr); + + return NULL; +} + +/* retry_pending - retry all pending RPCs for the given node name */ +void +retry_pending (char *node_name) +{ + int list_size = 0, i, j, found; + agent_arg_t *agent_arg_ptr = NULL; + + pthread_mutex_lock (&retry_mutex); + if (retry_list) { + list_size = list_count (retry_list); + } + for (i = 0; i < list_size; i++) { + agent_arg_ptr = (agent_arg_t *) list_dequeue (retry_list); + found = 0; + for (j = 0; j < agent_arg_ptr->node_count; j++) { + if (strncmp (&agent_arg_ptr->node_names[j*MAX_NAME_LEN], + node_name, MAX_NAME_LEN)) + continue; + found = 1; + break; + } + if (found) /* issue this RPC */ + spawn_retry_agent (agent_arg_ptr); + else /* put the RPC back on the queue */ + list_enqueue (retry_list, (void*) agent_arg_ptr); + } + pthread_mutex_unlock (&retry_mutex); +} + +/* spawn_retry_agent - pthread_crate an agent for the given task */ +void +spawn_retry_agent (agent_arg_t *agent_arg_ptr) +{ + pthread_attr_t attr_agent; + pthread_t thread_agent; + + if (agent_arg_ptr == NULL) + return; + + debug3 ("Spawning RPC retry agent"); + if (pthread_attr_init (&attr_agent)) + fatal ("pthread_attr_init error %m"); + if (pthread_attr_setdetachstate (&attr_agent, PTHREAD_CREATE_DETACHED)) + error ("pthread_attr_setdetachstate error %m"); #ifdef PTHREAD_SCOPE_SYSTEM - if (pthread_attr_setscope (&attr_agent, PTHREAD_SCOPE_SYSTEM)) - error ("pthread_attr_setscope error %m"); + if (pthread_attr_setscope (&attr_agent, PTHREAD_SCOPE_SYSTEM)) + error ("pthread_attr_setscope error %m"); #endif + if (pthread_create (&thread_agent, &attr_agent, + agent, (void *)agent_arg_ptr)) { + error ("pthread_create error %m"); + sleep (1); /* sleep and try once more */ if (pthread_create (&thread_agent, &attr_agent, - agent, (void *)agent_arg_ptr)) { - error ("pthread_create error %m"); - sleep (1); /* sleep and try once more */ - if (pthread_create (&thread_agent, &attr_agent, - agent, (void *)agent_arg_ptr)) - fatal ("pthread_create error %m"); - } + agent, (void *)agent_arg_ptr)) + fatal ("pthread_create error %m"); } - - return NULL; } diff --git a/src/slurmctld/agent.h b/src/slurmctld/agent.h index 716de60f047616bcf8696aa45721cc7cd3fd7b7b..aed547c09f11d2dca4da919633630a91c4522e26 100644 --- a/src/slurmctld/agent.h +++ b/src/slurmctld/agent.h @@ -37,7 +37,7 @@ #define COMMAND_TIMEOUT 5 /* seconds */ typedef struct agent_arg { - uint32_t addr_count; /* number of nodes to communicate with */ + uint32_t node_count; /* number of nodes to communicate with */ uint16_t retry; /* if set, keep trying */ struct sockaddr_in *slurm_addr; /* array of network addresses */ char *node_names; /* array with MAX_NAME_LEN bytes per node */ @@ -45,7 +45,15 @@ typedef struct agent_arg { void *msg_args; /* RPC data to be transmitted */ } agent_arg_t; +/* agent - perform requested RPC in parallel and in the background, report status + * upon completion, input is pointer to agent_arg_t */ extern void *agent (void *args); + +/* agent_retry - Agent for retrying pending RPCs (top one on the queue), + * argument is unused */ extern void *agent_retry (void *args); +/* retry_pending - retry all pending RPCs for the given node name */ +extern void retry_pending (char *node_name); + #endif /* !_AGENT_H */ diff --git a/src/slurmctld/node_mgr.c b/src/slurmctld/node_mgr.c index 3d548208f2aca95b8f3b542281ae85fde6abf7b0..b81c188ac01b5ba9b49e2df242a6d57568195205 100644 --- a/src/slurmctld/node_mgr.c +++ b/src/slurmctld/node_mgr.c @@ -1046,6 +1046,7 @@ validate_node_specs (char *node_name, uint32_t cpus, int error_code; struct config_record *config_ptr; struct node_record *node_ptr; + uint16_t resp_state; node_ptr = find_node_record (node_name); if (node_ptr == NULL) @@ -1084,11 +1085,17 @@ validate_node_specs (char *node_name, uint32_t cpus, } else { info ("validate_node_specs: node %s has registered", node_name); + resp_state = node_ptr->node_state & NODE_STATE_NO_RESPOND; node_ptr->node_state &= (uint16_t) (~NODE_STATE_NO_RESPOND); if (node_ptr->node_state == NODE_STATE_UNKNOWN) node_ptr->node_state = NODE_STATE_IDLE; - if (node_ptr->node_state == NODE_STATE_IDLE) + if (node_ptr->node_state == NODE_STATE_IDLE) { bit_set (idle_node_bitmap, (node_ptr - node_record_table_ptr)); + if (resp_state) { + /* Node just started responding, do all pending RPCs now */ + retry_pending (node_name); + } + } if (node_ptr->node_state != NODE_STATE_DOWN) bit_set (up_node_bitmap, (node_ptr - node_record_table_ptr)); } @@ -1102,6 +1109,7 @@ node_did_resp (char *name) { struct node_record *node_ptr; int node_inx; + uint16_t resp_state; node_ptr = find_node_record (name); if (node_ptr == NULL) { @@ -1112,11 +1120,17 @@ node_did_resp (char *name) node_inx = node_ptr - node_record_table_ptr; last_node_update = time (NULL); node_record_table_ptr[node_inx].last_response = time (NULL); + resp_state = node_ptr->node_state & NODE_STATE_NO_RESPOND; node_ptr->node_state &= (uint16_t) (~NODE_STATE_NO_RESPOND); if (node_ptr->node_state == NODE_STATE_UNKNOWN) node_ptr->node_state = NODE_STATE_IDLE; - if (node_ptr->node_state == NODE_STATE_IDLE) + if (node_ptr->node_state == NODE_STATE_IDLE) { bit_set (idle_node_bitmap, node_inx); + if (resp_state) { + /* Node just started responding, do all its pending RPCs now */ + retry_pending (name); + } + } if (node_ptr->node_state != NODE_STATE_DOWN) bit_set (up_node_bitmap, node_inx); return; @@ -1198,40 +1212,40 @@ ping_nodes (void) if (base_state == NODE_STATE_UNKNOWN) { debug3 ("attempt to register %s now", node_record_table_ptr[i].name); - if ((reg_agent_args->addr_count+1) > reg_buf_rec_size) { + if ((reg_agent_args->node_count+1) > reg_buf_rec_size) { reg_buf_rec_size += 32; xrealloc ((reg_agent_args->slurm_addr), (sizeof (struct sockaddr_in) * reg_buf_rec_size)); xrealloc ((reg_agent_args->node_names), (MAX_NAME_LEN * reg_buf_rec_size)); } - reg_agent_args->slurm_addr[reg_agent_args->addr_count] = + reg_agent_args->slurm_addr[reg_agent_args->node_count] = node_record_table_ptr[i].slurm_addr; - pos = MAX_NAME_LEN * reg_agent_args->addr_count; + pos = MAX_NAME_LEN * reg_agent_args->node_count; strncpy (®_agent_args->node_names[pos], node_record_table_ptr[i].name, MAX_NAME_LEN); - reg_agent_args->addr_count++; + reg_agent_args->node_count++; continue; } debug3 ("ping %s now", node_record_table_ptr[i].name); - if ((ping_agent_args->addr_count+1) > ping_buf_rec_size) { + if ((ping_agent_args->node_count+1) > ping_buf_rec_size) { ping_buf_rec_size += 32; xrealloc ((ping_agent_args->slurm_addr), (sizeof (struct sockaddr_in) * ping_buf_rec_size)); xrealloc ((ping_agent_args->node_names), (MAX_NAME_LEN * ping_buf_rec_size)); } - ping_agent_args->slurm_addr[ping_agent_args->addr_count] = + ping_agent_args->slurm_addr[ping_agent_args->node_count] = node_record_table_ptr[i].slurm_addr; - pos = MAX_NAME_LEN * ping_agent_args->addr_count; + pos = MAX_NAME_LEN * ping_agent_args->node_count; strncpy (&ping_agent_args->node_names[pos], node_record_table_ptr[i].name, MAX_NAME_LEN); - ping_agent_args->addr_count++; + ping_agent_args->node_count++; } - if (ping_agent_args->addr_count == 0) + if (ping_agent_args->node_count == 0) xfree (ping_agent_args); else { debug ("Spawning ping agent"); @@ -1253,7 +1267,7 @@ ping_nodes (void) } } - if (reg_agent_args->addr_count == 0) + if (reg_agent_args->node_count == 0) xfree (reg_agent_args); else { debug ("Spawning node registration agent"); diff --git a/src/slurmctld/node_scheduler.c b/src/slurmctld/node_scheduler.c index 905d626d5e2c21a4b4cbfdd505091e4de5f44a4e..9d8cd8dc56fbba61085c55a040a4542e6cfd8724 100644 --- a/src/slurmctld/node_scheduler.c +++ b/src/slurmctld/node_scheduler.c @@ -132,18 +132,18 @@ deallocate_nodes (struct job_record * job_ptr) for (i = 0; i < node_record_count; i++) { if (bit_test (job_ptr->node_bitmap, i) == 0) continue; - if ((agent_args->addr_count+1) > buf_rec_size) { + if ((agent_args->node_count+1) > buf_rec_size) { buf_rec_size += 32; xrealloc ((agent_args->slurm_addr), (sizeof (struct sockaddr_in) * buf_rec_size)); xrealloc ((agent_args->node_names), (MAX_NAME_LEN * buf_rec_size)); } - agent_args->slurm_addr[agent_args->addr_count] = + agent_args->slurm_addr[agent_args->node_count] = node_record_table_ptr[i].slurm_addr; - strncpy (&agent_args->node_names[MAX_NAME_LEN*agent_args->addr_count], + strncpy (&agent_args->node_names[MAX_NAME_LEN*agent_args->node_count], node_record_table_ptr[i].name, MAX_NAME_LEN); - agent_args->addr_count++; + agent_args->node_count++; base_state = node_record_table_ptr[i].node_state & (~NODE_STATE_NO_RESPOND); no_resp_flag = node_record_table_ptr[i].node_state & NODE_STATE_NO_RESPOND; if (base_state == NODE_STATE_DRAINING) { diff --git a/src/slurmctld/slurmctld.h b/src/slurmctld/slurmctld.h index f5aea3547c7af02b441c7329e7fb833bacf33103..aedc3816e7b35a38b1ba433636deaad2e5e19445 100644 --- a/src/slurmctld/slurmctld.h +++ b/src/slurmctld/slurmctld.h @@ -50,7 +50,7 @@ #define PERIODIC_CHECKPOINT 300 /* Retry an incomplete RPC agent request every RPC_RETRY_INTERVAL seconds */ -#define RPC_RETRY_INTERVAL 300 +#define RPC_RETRY_INTERVAL 60 /* Attempt to schedule jobs every PERIODIC_SCHEDULE seconds despite any RPC activity * This will catch any state transisions that may have otherwise been missed */