From 44c92b15e2b2bf9aca38f1c93b98eca0716fd1a0 Mon Sep 17 00:00:00 2001
From: Moe Jette <jette1@llnl.gov>
Date: Wed, 25 Sep 2002 00:36:39 +0000
Subject: [PATCH] Re-issue pending RPCs for a node when it is restored to
 service.

---
 src/slurmctld/agent.c          | 89 +++++++++++++++++++++++++---------
 src/slurmctld/agent.h          | 10 +++-
 src/slurmctld/node_mgr.c       | 38 ++++++++++-----
 src/slurmctld/node_scheduler.c |  8 +--
 src/slurmctld/slurmctld.h      |  2 +-
 5 files changed, 105 insertions(+), 42 deletions(-)

diff --git a/src/slurmctld/agent.c b/src/slurmctld/agent.c
index 612b1509d54..6837952e11a 100644
--- a/src/slurmctld/agent.c
+++ b/src/slurmctld/agent.c
@@ -104,6 +104,7 @@ typedef struct task_info {
 
 static void alarm_handler(int dummy);
 static void queue_agent_retry (agent_info_t *agent_info_ptr, int count);
+static void spawn_retry_agent (agent_arg_t *agent_arg_ptr);
 static void *thread_per_node_rpc (void *args);
 static void *wdog (void *args);
 static void xsignal(int signal, void (*handler)(int));
@@ -132,7 +133,7 @@ agent (void *args)
 	/* basic argument value tests */
 	if (agent_arg_ptr == NULL)
 		fatal ("agent NULL argument");
-	if (agent_arg_ptr->addr_count == 0)
+	if (agent_arg_ptr->node_count == 0)
 		goto cleanup;	/* no messages to be sent */
 	if (agent_arg_ptr->slurm_addr == NULL)
 		fatal ("agent passed NULL address list");
@@ -149,10 +150,10 @@ agent (void *args)
 		fatal (" pthread_mutex_init error %m");
 	if (pthread_cond_init (&agent_info_ptr->thread_cond, NULL))
 		fatal ("pthread_cond_init error %m");
-	agent_info_ptr->thread_count = agent_arg_ptr->addr_count;
+	agent_info_ptr->thread_count = agent_arg_ptr->node_count;
 	agent_info_ptr->retry = agent_arg_ptr->retry;
 	agent_info_ptr->threads_active = 0;
-	thread_ptr = xmalloc (agent_arg_ptr->addr_count * sizeof (thd_t));
+	thread_ptr = xmalloc (agent_arg_ptr->node_count * sizeof (thd_t));
 	agent_info_ptr->thread_struct = thread_ptr;
 	agent_info_ptr->msg_type = agent_arg_ptr->msg_type;
 	agent_info_ptr->msg_args_pptr = &agent_arg_ptr->msg_args;
@@ -501,7 +502,7 @@ queue_agent_retry (agent_info_t *agent_info_ptr, int count)
 
 	/* build agent argument with just the RPCs to retry */
 	agent_arg_ptr = xmalloc (sizeof (agent_arg_t));
-	agent_arg_ptr -> addr_count = count;
+	agent_arg_ptr -> node_count = count;
 	agent_arg_ptr -> retry = 1;
 	agent_arg_ptr -> slurm_addr = xmalloc (sizeof (struct sockaddr_in) * count);
 	agent_arg_ptr -> node_names = xmalloc (MAX_NAME_LEN * count);
@@ -532,38 +533,78 @@ queue_agent_retry (agent_info_t *agent_info_ptr, int count)
 	pthread_mutex_unlock (&retry_mutex);
 }
 
-/* Agent for retrying RPCs */
+/* agent_retry - Agent for retrying pending RPCs (top one on the queue), 
+ *	argument is unused */
 void *
 agent_retry (void *args)
 {
 	agent_arg_t *agent_arg_ptr = NULL;
-	pthread_attr_t attr_agent;
-	pthread_t thread_agent;
 
 	pthread_mutex_lock (&retry_mutex);
 	if (retry_list)
 		agent_arg_ptr = (agent_arg_t *) list_dequeue (retry_list);
 	pthread_mutex_unlock (&retry_mutex);
 
-	if (agent_arg_ptr) {
-		debug3 ("Spawning RPC retry agent");
-		if (pthread_attr_init (&attr_agent))
-			fatal ("pthread_attr_init error %m");
-		if (pthread_attr_setdetachstate (&attr_agent, PTHREAD_CREATE_DETACHED))
-			error ("pthread_attr_setdetachstate error %m");
+	if (agent_arg_ptr)
+		spawn_retry_agent (agent_arg_ptr);
+
+	return NULL;
+}
+
+/* retry_pending - retry all pending RPCs for the given node name */
+void
+retry_pending (char *node_name)
+{
+	int list_size = 0, i, j, found;
+	agent_arg_t *agent_arg_ptr = NULL;
+
+	pthread_mutex_lock (&retry_mutex);
+	if (retry_list) {
+		list_size = list_count (retry_list);
+	}
+	for (i = 0; i < list_size; i++) {
+		agent_arg_ptr = (agent_arg_t *) list_dequeue (retry_list);
+		found = 0;
+		for (j = 0; j < agent_arg_ptr->node_count; j++) {
+			if (strncmp (&agent_arg_ptr->node_names[j*MAX_NAME_LEN],
+			             node_name, MAX_NAME_LEN))
+				continue;
+			found = 1;
+			break;
+		}
+		if (found)	/* issue this RPC */
+			spawn_retry_agent (agent_arg_ptr);
+		else		/* put the RPC back on the queue */
+			list_enqueue (retry_list, (void*) agent_arg_ptr);
+	}
+	pthread_mutex_unlock (&retry_mutex);
+}
+
+/* spawn_retry_agent - pthread_crate an agent for the given task */
+void
+spawn_retry_agent (agent_arg_t *agent_arg_ptr)
+{
+	pthread_attr_t attr_agent;
+	pthread_t thread_agent;
+
+	if (agent_arg_ptr == NULL)
+		return;
+
+	debug3 ("Spawning RPC retry agent");
+	if (pthread_attr_init (&attr_agent))
+		fatal ("pthread_attr_init error %m");
+	if (pthread_attr_setdetachstate (&attr_agent, PTHREAD_CREATE_DETACHED))
+		error ("pthread_attr_setdetachstate error %m");
 #ifdef PTHREAD_SCOPE_SYSTEM
-		if (pthread_attr_setscope (&attr_agent, PTHREAD_SCOPE_SYSTEM))
-			error ("pthread_attr_setscope error %m");
+	if (pthread_attr_setscope (&attr_agent, PTHREAD_SCOPE_SYSTEM))
+		error ("pthread_attr_setscope error %m");
 #endif
+	if (pthread_create (&thread_agent, &attr_agent, 
+				agent, (void *)agent_arg_ptr)) {
+		error ("pthread_create error %m");
+		sleep (1); /* sleep and try once more */
 		if (pthread_create (&thread_agent, &attr_agent, 
-					agent, (void *)agent_arg_ptr)) {
-			error ("pthread_create error %m");
-			sleep (1); /* sleep and try once more */
-			if (pthread_create (&thread_agent, &attr_agent, 
-						agent, (void *)agent_arg_ptr))
-				fatal ("pthread_create error %m");
-		}
+					agent, (void *)agent_arg_ptr))
+			fatal ("pthread_create error %m");
 	}
-
-	return NULL;
 }
diff --git a/src/slurmctld/agent.h b/src/slurmctld/agent.h
index 716de60f047..aed547c09f1 100644
--- a/src/slurmctld/agent.h
+++ b/src/slurmctld/agent.h
@@ -37,7 +37,7 @@
 #define COMMAND_TIMEOUT 	5	/* seconds */
 
 typedef struct agent_arg {
-	uint32_t	addr_count;		/* number of nodes to communicate with */
+	uint32_t	node_count;		/* number of nodes to communicate with */
 	uint16_t	retry;			/* if set, keep trying */
 	struct sockaddr_in *slurm_addr;		/* array of network addresses */
 	char		*node_names;		/* array with MAX_NAME_LEN bytes per node */
@@ -45,7 +45,15 @@ typedef struct agent_arg {
 	void		*msg_args;		/* RPC data to be transmitted */
 } agent_arg_t;
 
+/* agent - perform requested RPC in parallel and in the background, report status 
+ *	upon completion, input is pointer to agent_arg_t */
 extern void *agent (void *args);
+
+/* agent_retry - Agent for retrying pending RPCs (top one on the queue), 
+ *	argument is unused */
 extern void *agent_retry (void *args);
 
+/* retry_pending - retry all pending RPCs for the given node name */
+extern void retry_pending (char *node_name);
+
 #endif /* !_AGENT_H */
diff --git a/src/slurmctld/node_mgr.c b/src/slurmctld/node_mgr.c
index 3d548208f2a..b81c188ac01 100644
--- a/src/slurmctld/node_mgr.c
+++ b/src/slurmctld/node_mgr.c
@@ -1046,6 +1046,7 @@ validate_node_specs (char *node_name, uint32_t cpus,
 	int error_code;
 	struct config_record *config_ptr;
 	struct node_record *node_ptr;
+	uint16_t resp_state;
 
 	node_ptr = find_node_record (node_name);
 	if (node_ptr == NULL)
@@ -1084,11 +1085,17 @@ validate_node_specs (char *node_name, uint32_t cpus,
 	}
 	else {
 		info ("validate_node_specs: node %s has registered", node_name);
+		resp_state = node_ptr->node_state & NODE_STATE_NO_RESPOND;
 		node_ptr->node_state &= (uint16_t) (~NODE_STATE_NO_RESPOND);
 		if (node_ptr->node_state == NODE_STATE_UNKNOWN)
 			node_ptr->node_state = NODE_STATE_IDLE;
-		if (node_ptr->node_state == NODE_STATE_IDLE)
+		if (node_ptr->node_state == NODE_STATE_IDLE) {
 			bit_set (idle_node_bitmap, (node_ptr - node_record_table_ptr));
+			if (resp_state)	{
+				/* Node just started responding, do all pending RPCs now */
+				retry_pending (node_name);
+			}
+		}
 		if (node_ptr->node_state != NODE_STATE_DOWN)
 			bit_set (up_node_bitmap, (node_ptr - node_record_table_ptr));
 	}
@@ -1102,6 +1109,7 @@ node_did_resp (char *name)
 {
 	struct node_record *node_ptr;
 	int node_inx;
+	uint16_t resp_state;
 
 	node_ptr = find_node_record (name);
 	if (node_ptr == NULL) {
@@ -1112,11 +1120,17 @@ node_did_resp (char *name)
 	node_inx = node_ptr - node_record_table_ptr;
 	last_node_update = time (NULL);
 	node_record_table_ptr[node_inx].last_response = time (NULL);
+	resp_state = node_ptr->node_state & NODE_STATE_NO_RESPOND;
 	node_ptr->node_state &= (uint16_t) (~NODE_STATE_NO_RESPOND);
 	if (node_ptr->node_state == NODE_STATE_UNKNOWN)
 		node_ptr->node_state = NODE_STATE_IDLE;
-	if (node_ptr->node_state == NODE_STATE_IDLE)
+	if (node_ptr->node_state == NODE_STATE_IDLE) {
 		bit_set (idle_node_bitmap, node_inx);
+		if (resp_state)	{
+			/* Node just started responding, do all its pending RPCs now */
+			retry_pending (name);
+		}
+	}
 	if (node_ptr->node_state != NODE_STATE_DOWN)
 		bit_set (up_node_bitmap, node_inx);
 	return;
@@ -1198,40 +1212,40 @@ ping_nodes (void)
 
 		if (base_state == NODE_STATE_UNKNOWN) {
 			debug3 ("attempt to register %s now", node_record_table_ptr[i].name);
-			if ((reg_agent_args->addr_count+1) > reg_buf_rec_size) {
+			if ((reg_agent_args->node_count+1) > reg_buf_rec_size) {
 				reg_buf_rec_size += 32;
 				xrealloc ((reg_agent_args->slurm_addr), 
 				          (sizeof (struct sockaddr_in) * reg_buf_rec_size));
 				xrealloc ((reg_agent_args->node_names), 
 				          (MAX_NAME_LEN * reg_buf_rec_size));
 			}
-			reg_agent_args->slurm_addr[reg_agent_args->addr_count] = 
+			reg_agent_args->slurm_addr[reg_agent_args->node_count] = 
 						node_record_table_ptr[i].slurm_addr;
-			pos = MAX_NAME_LEN * reg_agent_args->addr_count;
+			pos = MAX_NAME_LEN * reg_agent_args->node_count;
 			strncpy (&reg_agent_args->node_names[pos],
 			         node_record_table_ptr[i].name, MAX_NAME_LEN);
-			reg_agent_args->addr_count++;
+			reg_agent_args->node_count++;
 			continue;
 		}
 
 		debug3 ("ping %s now", node_record_table_ptr[i].name);
-		if ((ping_agent_args->addr_count+1) > ping_buf_rec_size) {
+		if ((ping_agent_args->node_count+1) > ping_buf_rec_size) {
 			ping_buf_rec_size += 32;
 			xrealloc ((ping_agent_args->slurm_addr), 
 			          (sizeof (struct sockaddr_in) * ping_buf_rec_size));
 			xrealloc ((ping_agent_args->node_names), 
 			          (MAX_NAME_LEN * ping_buf_rec_size));
 		}
-		ping_agent_args->slurm_addr[ping_agent_args->addr_count] = 
+		ping_agent_args->slurm_addr[ping_agent_args->node_count] = 
 						node_record_table_ptr[i].slurm_addr;
-		pos = MAX_NAME_LEN * ping_agent_args->addr_count;
+		pos = MAX_NAME_LEN * ping_agent_args->node_count;
 		strncpy (&ping_agent_args->node_names[pos],
 		         node_record_table_ptr[i].name, MAX_NAME_LEN);
-		ping_agent_args->addr_count++;
+		ping_agent_args->node_count++;
 
 	}
 
-	if (ping_agent_args->addr_count == 0)
+	if (ping_agent_args->node_count == 0)
 		xfree (ping_agent_args);
 	else {
 		debug ("Spawning ping agent");
@@ -1253,7 +1267,7 @@ ping_nodes (void)
 		}
 	}
 
-	if (reg_agent_args->addr_count == 0)
+	if (reg_agent_args->node_count == 0)
 		xfree (reg_agent_args);
 	else {
 		debug ("Spawning node registration agent");
diff --git a/src/slurmctld/node_scheduler.c b/src/slurmctld/node_scheduler.c
index 905d626d5e2..9d8cd8dc56f 100644
--- a/src/slurmctld/node_scheduler.c
+++ b/src/slurmctld/node_scheduler.c
@@ -132,18 +132,18 @@ deallocate_nodes (struct job_record  * job_ptr)
 	for (i = 0; i < node_record_count; i++) {
 		if (bit_test (job_ptr->node_bitmap, i) == 0)
 			continue;
-		if ((agent_args->addr_count+1) > buf_rec_size) {
+		if ((agent_args->node_count+1) > buf_rec_size) {
 			buf_rec_size += 32;
 			xrealloc ((agent_args->slurm_addr), 
 			          (sizeof (struct sockaddr_in) * buf_rec_size));
 			xrealloc ((agent_args->node_names), 
 			          (MAX_NAME_LEN * buf_rec_size));
 		}
-		agent_args->slurm_addr[agent_args->addr_count] = 
+		agent_args->slurm_addr[agent_args->node_count] = 
 							node_record_table_ptr[i].slurm_addr;
-		strncpy (&agent_args->node_names[MAX_NAME_LEN*agent_args->addr_count],
+		strncpy (&agent_args->node_names[MAX_NAME_LEN*agent_args->node_count],
 		         node_record_table_ptr[i].name, MAX_NAME_LEN);
-		agent_args->addr_count++;
+		agent_args->node_count++;
 		base_state = node_record_table_ptr[i].node_state & (~NODE_STATE_NO_RESPOND);
 		no_resp_flag = node_record_table_ptr[i].node_state & NODE_STATE_NO_RESPOND;
 		if (base_state == NODE_STATE_DRAINING) {
diff --git a/src/slurmctld/slurmctld.h b/src/slurmctld/slurmctld.h
index f5aea3547c7..aedc3816e7b 100644
--- a/src/slurmctld/slurmctld.h
+++ b/src/slurmctld/slurmctld.h
@@ -50,7 +50,7 @@
 #define	PERIODIC_CHECKPOINT	300
 
 /* Retry an incomplete RPC agent request every RPC_RETRY_INTERVAL seconds */
-#define	RPC_RETRY_INTERVAL	300
+#define	RPC_RETRY_INTERVAL	60
 
 /* Attempt to schedule jobs every PERIODIC_SCHEDULE seconds despite any RPC activity 
  * This will catch any state transisions that may have otherwise been missed */
-- 
GitLab