From 1dacc6076185dbf27f9c6e6044afc36834c14538 Mon Sep 17 00:00:00 2001
From: Danny Auble <da@llnl.gov>
Date: Thu, 7 May 2009 23:15:05 +0000
Subject: [PATCH] change fanout logic to start on calling node instead of first
 node in message nodelist.

---
 NEWS                            |   2 +
 doc/html/faq.shtml              |  14 +++
 src/common/forward.c            | 215 ++++++++++++++++++++++++++++++--
 src/common/forward.h            |  16 ++-
 src/common/slurm_protocol_api.c | 190 +++++++++++++++-------------
 5 files changed, 336 insertions(+), 101 deletions(-)

diff --git a/NEWS b/NEWS
index 2c79795ba04..3558838dbc9 100644
--- a/NEWS
+++ b/NEWS
@@ -3,6 +3,8 @@ documents those changes that are of interest to users and admins.
 
 * Changes in SLURM 2.0.0
 ==============================
+ -- Change fanout logic to start on calling node instead of first node in 
+    message nodelist.
 
 * Changes in SLURM 2.0.0-rc1
 ==============================
diff --git a/doc/html/faq.shtml b/doc/html/faq.shtml
index 295f1b918f7..b39034e1623 100644
--- a/doc/html/faq.shtml
+++ b/doc/html/faq.shtml
@@ -110,6 +110,8 @@ running even though the job is supposed to be completed?</li>
 <li><a href="#slurmd_oom">How can I prevent the <i>slurmd</i> and
 <i>slurmstepd</i> daemons from being killed when a node's memory 
 is exhausted?</li>
+<li><a href="#ubuntu">I see my host of my calling node as 127.0.1.1
+    instead of the correct ip address.  Why is that?</a></li>
 </ol>
 
 
@@ -1187,6 +1189,18 @@ daemon with the <i>SLURMD_OOM_ADJ</i> and/or <i>SLURMSTEPD_OOM_ADJ</i>
 environment variables set to the desired values.
 A value of -17 typically will disable killing.</p>
 
+<p><a name="ubuntu"><b>38. I see my host of my calling node as 127.0.1.1
+    instead of the correct ip address.  Why is that?</b></a></br>
+Some systems by default will put your host in the /etc/hosts file as
+    something like 
+<pre>
+127.0.1.1	snowflake.llnl.gov	snowflake
+</pre>
+This will cause srun and other things to grab 127.0.1.1 as it's
+address instead of the correct address and make it so the
+communication doesn't work.  Solution is to either remove this line or
+set a different nodeaddr that is known by your other nodes.</p>
+
 <p class="footer"><a href="#top">top</a></p>
 
 <p style="text-align:center;">Last modified 25 March 2009</p>
diff --git a/src/common/forward.c b/src/common/forward.c
index b0552ed4b24..f8e13c624fe 100644
--- a/src/common/forward.c
+++ b/src/common/forward.c
@@ -62,6 +62,24 @@
 
 #define MAX_RETRIES 3
 
+typedef struct {
+	pthread_cond_t *notify;
+	slurm_msg_t *orig_msg;
+	List ret_list;
+	int timeout;
+	hostlist_t tree_hl;
+	pthread_mutex_t *tree_mutex;
+} fwd_tree_t;
+
+void _destroy_tree_fwd(fwd_tree_t *fwd_tree)
+{
+	if(fwd_tree) {
+		if(fwd_tree->tree_hl)
+			hostlist_destroy(fwd_tree->tree_hl);
+		xfree(fwd_tree);
+	}
+}
+
 void *_forward_thread(void *arg)
 {
 	forward_msg_t *fwd_msg = (forward_msg_t *)arg;
@@ -283,6 +301,85 @@ cleanup:
 	return (NULL);
 }
 
+void *_fwd_tree_thread(void *arg)
+{
+	fwd_tree_t *fwd_tree = (fwd_tree_t *)arg;
+	List ret_list = NULL;
+	char *name = NULL;
+	char buf[8196];
+	slurm_msg_t send_msg;	
+	
+	slurm_msg_t_init(&send_msg);
+	send_msg.msg_type = fwd_tree->orig_msg->msg_type;
+	send_msg.data = fwd_tree->orig_msg->data;
+
+	/* repeat until we are sure the message was sent */ 
+	while((name = hostlist_shift(fwd_tree->tree_hl))) {
+		if(slurm_conf_get_addr(name, &send_msg.address)
+		   == SLURM_ERROR) {
+			error("fwd_tree_thread: can't find address for host "
+			      "%s, check slurm.conf", name);
+			slurm_mutex_lock(fwd_tree->tree_mutex);
+			mark_as_failed_forward(&fwd_tree->ret_list, name,
+					SLURM_COMMUNICATIONS_CONNECTION_ERROR);
+ 			pthread_cond_signal(fwd_tree->notify);
+			slurm_mutex_unlock(fwd_tree->tree_mutex);
+			free(name);
+		
+			continue;
+		}
+		
+		hostlist_ranged_string(fwd_tree->tree_hl, sizeof(buf), buf);
+		send_msg.forward.nodelist = xstrdup(buf);
+		send_msg.forward.timeout = fwd_tree->timeout;
+		send_msg.forward.cnt = hostlist_count(fwd_tree->tree_hl);
+		if (send_msg.forward.nodelist[0]) {
+			debug3("Tree sending to %s along with %s", 
+			       name, send_msg.forward.nodelist);
+		} else
+			debug3("Tree sending to %s", name);
+
+		ret_list = slurm_send_addr_recv_msgs(&send_msg, name,
+						     fwd_tree->timeout);
+
+		xfree(send_msg.forward.nodelist);
+
+		if(ret_list) {
+			slurm_mutex_lock(fwd_tree->tree_mutex);
+			list_transfer(fwd_tree->ret_list, ret_list);
+			pthread_cond_signal(fwd_tree->notify);
+			slurm_mutex_unlock(fwd_tree->tree_mutex);
+			list_destroy(ret_list);
+		} else {
+			/* This should never happen (when this was
+			   written slurm_send_addr_recv_msgs always
+			   returned a list */
+			error("fwd_tree_thread: no return list given from "
+			      "slurm_send_addr_recv_msgs", name);
+			slurm_mutex_lock(fwd_tree->tree_mutex);
+			mark_as_failed_forward(&fwd_tree->ret_list, name,
+					SLURM_COMMUNICATIONS_CONNECTION_ERROR);
+ 			pthread_cond_signal(fwd_tree->notify);
+			slurm_mutex_unlock(fwd_tree->tree_mutex);
+			free(name);
+			
+			continue;
+		}
+
+		free(name);
+		
+		/* check for error and try again */
+		if(errno == SLURM_COMMUNICATIONS_CONNECTION_ERROR) 
+ 			continue;						
+		
+		break;
+	}
+
+	_destroy_tree_fwd(fwd_tree);
+		
+	return NULL;
+}
+
 /*
  * forward_init    - initilize forward structure
  * IN: forward     - forward_t *   - struct to store forward info
@@ -320,7 +417,7 @@ extern void forward_init(forward_t *forward, forward_t *from)
 extern int forward_msg(forward_struct_t *forward_struct, 
 		       header_t *header)
 {
-	int i = 0, j = 0;
+	int j = 0;
 	int retries = 0;
 	forward_msg_t *forward_msg = NULL;
 	int thr_count = 0;
@@ -335,8 +432,7 @@ extern int forward_msg(forward_struct_t *forward_struct,
 		return SLURM_ERROR;
 	}
 	hl = hostlist_create(header->forward.nodelist);	
-	
-	i = 0;
+	hostlist_uniq(hl);
 	
 	while((name = hostlist_shift(hl))) {
 		pthread_attr_t attr_agent;
@@ -375,7 +471,6 @@ extern int forward_msg(forward_struct_t *forward_struct,
 		forward_msg->header.ret_cnt = 0;
 		
 		forward_hl = hostlist_create(name);
-		i++;
 		free(name);
 		for(j = 0; j < span[thr_count]; j++) {
 			name = hostlist_shift(hl);
@@ -383,9 +478,8 @@ extern int forward_msg(forward_struct_t *forward_struct,
 				break;
 			hostlist_push(forward_hl, name);
 			free(name);
-			i++;
 		}
-		hostlist_uniq(forward_hl);
+
 		hostlist_ranged_string(forward_hl, sizeof(buf), buf);
 		hostlist_destroy(forward_hl);
 		forward_init(&forward_msg->header.forward, NULL);
@@ -406,6 +500,104 @@ extern int forward_msg(forward_struct_t *forward_struct,
 	return SLURM_SUCCESS;
 }
 
+/*
+ * start_msg_tree  - logic to begin the forward tree and
+ *                   accumulate the return codes from processes getting the
+ *                   the forwarded message
+ *
+ * IN: hl          - hostlist_t   - list of every node to send message to
+ * IN: msg         - slurm_msg_t  - message to send.
+ * IN: timeout     - int          - how long to wait in milliseconds.
+ * RET List 	   - List containing the responses of the childern
+ *		     (if any) we forwarded the message to. List
+ *		     containing type (ret_data_info_t).
+ */
+extern List start_msg_tree(hostlist_t hl, slurm_msg_t *msg, int timeout)
+{
+	int *span = NULL;
+	fwd_tree_t *fwd_tree = NULL;
+	pthread_mutex_t tree_mutex;
+	pthread_cond_t notify;
+	int j = 0, count = 0;
+	List ret_list = NULL;
+	char *name = NULL;
+	int thr_count = 0;
+	int host_count = 0;
+
+	xassert(hl);
+	xassert(msg);
+
+	hostlist_uniq(hl);		
+	host_count = hostlist_count(hl);
+
+	span = set_span(host_count, 0);
+
+	slurm_mutex_init(&tree_mutex);
+	pthread_cond_init(&notify, NULL);
+
+	ret_list = list_create(destroy_data_info);
+	
+	while((name = hostlist_shift(hl))) {
+		pthread_attr_t attr_agent;
+		pthread_t thread_agent;
+		int retries = 0;
+
+		slurm_attr_init(&attr_agent);
+		if (pthread_attr_setdetachstate
+		    (&attr_agent, PTHREAD_CREATE_DETACHED))
+			error("pthread_attr_setdetachstate error %m");
+
+		fwd_tree = xmalloc(sizeof(fwd_tree_t));
+		fwd_tree->orig_msg = msg;
+		fwd_tree->ret_list = ret_list;
+		fwd_tree->timeout = timeout;
+		fwd_tree->notify = &notify;
+		fwd_tree->tree_mutex = &tree_mutex;
+
+		if(fwd_tree->timeout <= 0) {
+			/* convert secs to msec */
+			fwd_tree->timeout  = slurm_get_msg_timeout() * 1000; 
+		}
+
+		fwd_tree->tree_hl = hostlist_create(name);
+		free(name);
+		for(j = 0; j < span[thr_count]; j++) {
+			name = hostlist_shift(hl);
+			if(!name)
+				break;
+			hostlist_push(fwd_tree->tree_hl, name);
+			free(name);
+		}
+
+		while(pthread_create(&thread_agent, &attr_agent,
+				     _fwd_tree_thread, (void *)fwd_tree)) {
+			error("pthread_create error %m");
+			if (++retries > MAX_RETRIES)
+				fatal("Can't create pthread");
+			sleep(1);	/* sleep and try again */
+		}
+		slurm_attr_destroy(&attr_agent);
+		thr_count++; 
+	}
+	xfree(span);
+	
+	slurm_mutex_lock(&tree_mutex);
+
+	count = list_count(ret_list);
+	debug2("Tree head got back %d looking for %d", count, host_count);
+	while((count < host_count)) {
+		pthread_cond_wait(&notify, &tree_mutex);
+		count = list_count(ret_list);
+		debug2("Tree head got back %d", count);
+	}
+	debug2("Tree head got them all");
+	slurm_mutex_unlock(&tree_mutex);
+
+	slurm_mutex_destroy(&tree_mutex);
+	pthread_cond_destroy(&notify);
+
+	return ret_list;
+}
 
 /*
  * mark_as_failed_forward- mark a node as failed and add it to "ret_list"
@@ -441,16 +633,16 @@ extern void forward_wait(slurm_msg_t * msg)
 		debug2("looking for %d", msg->forward_struct->fwd_cnt);
 		slurm_mutex_lock(&msg->forward_struct->forward_mutex);
 		count = 0;
-		if (msg->ret_list != NULL) {
-			count += list_count(msg->ret_list);
-		}
+		if (msg->ret_list != NULL) 
+			count = list_count(msg->ret_list);
+		
 		debug2("Got back %d", count);
 		while((count < msg->forward_struct->fwd_cnt)) {
 			pthread_cond_wait(&msg->forward_struct->notify, 
 					  &msg->forward_struct->forward_mutex);
-			count = 0;
+			
 			if (msg->ret_list != NULL) {
-				count += list_count(msg->ret_list);
+				count = list_count(msg->ret_list);
 			}
 			debug2("Got back %d", count);
 				
@@ -493,4 +685,3 @@ void destroy_forward_struct(forward_struct_t *forward_struct)
 		xfree(forward_struct);
 	}
 }
-
diff --git a/src/common/forward.h b/src/common/forward.h
index de4066ca266..da707a57123 100644
--- a/src/common/forward.h
+++ b/src/common/forward.h
@@ -88,6 +88,21 @@ if (forward_msg(forward_struct, &header) == SLURM_ERROR) {
 extern int forward_msg(forward_struct_t *forward_struct, 
 		       header_t *header);
 
+
+/*
+ * start_msg_tree  - logic to begin the forward tree and
+ *                   accumulate the return codes from processes getting the
+ *                   the forwarded message
+ *
+ * IN: hl          - hostlist_t   - list of every node to send message to
+ * IN: msg         - slurm_msg_t  - message to send.
+ * IN: timeout     - int          - how long to wait in milliseconds.
+ * RET List 	   - List containing the responses of the childern
+ *		     (if any) we forwarded the message to. List
+ *		     containing type (ret_data_info_t).
+ */
+extern List start_msg_tree(hostlist_t hl, slurm_msg_t *msg, int timeout);
+
 /*
  * mark_as_failed_forward- mark a node as failed and add it to "ret_list"
  *
@@ -129,6 +144,5 @@ if(!ret_list || list_count(ret_list) == 0) {
 extern void destroy_data_info(void *object);
 extern void destroy_forward(forward_t *forward);
 extern void destroy_forward_struct(forward_struct_t *forward_struct);
-extern void destroy_ret_types(void *object);
 	
 #endif
diff --git a/src/common/slurm_protocol_api.c b/src/common/slurm_protocol_api.c
index 4f35dcc7b45..c54f4b461e5 100644
--- a/src/common/slurm_protocol_api.c
+++ b/src/common/slurm_protocol_api.c
@@ -2042,11 +2042,11 @@ List slurm_receive_msgs(slurm_fd fd, int steps, int timeout)
 	}
 	//info("ret_cnt = %d",header.ret_cnt);
 	if(header.ret_cnt > 0) {
-		ret_list = list_create(destroy_data_info);
-		while((ret_data_info = list_pop(header.ret_list)))
-			list_push(ret_list, ret_data_info);
+		if(header.ret_list)
+			ret_list = header.ret_list;
+		else
+			ret_list = list_create(destroy_data_info);
 		header.ret_cnt = 0;
-		list_destroy(header.ret_list);
 		header.ret_list = NULL;
 	}
 	
@@ -3085,18 +3085,19 @@ List slurm_send_recv_msgs(const char *nodelist, slurm_msg_t *msg,
 			  int timeout, bool quiet)
 {
 	List ret_list = NULL;
-	List tmp_ret_list = NULL;
-	slurm_fd fd = -1;
-	char *name = NULL;
-	char buf[8192];
+//	List tmp_ret_list = NULL;
+//	slurm_fd fd = -1;
+//	char *name = NULL;
+//	char buf[8192];
 	hostlist_t hl = NULL;
-	ret_data_info_t *ret_data_info = NULL;
-	ListIterator itr;
+//	ret_data_info_t *ret_data_info = NULL;
+//	ListIterator itr;
 
 	if(!nodelist || !strlen(nodelist)) {
 		error("slurm_send_recv_msgs: no nodelist given");
 		return NULL;
 	}
+	
 #ifdef HAVE_FRONT_END
 	/* only send to the front end node */
 	name = nodelist_nth_host(nodelist, 0);
@@ -3112,85 +3113,99 @@ List slurm_send_recv_msgs(const char *nodelist, slurm_msg_t *msg,
 /* 	info("total sending to %s",nodelist); */
 	hl = hostlist_create(nodelist);
 #endif
-	while((name = hostlist_shift(hl))) {
-		
-		if(slurm_conf_get_addr(name, &msg->address) == SLURM_ERROR) {
-			if (quiet) {
-				debug("slurm_send_recv_msgs: can't find "
-				      "address for host %s, check slurm.conf", 
-				      name);
-			} else {
-				error("slurm_send_recv_msgs: can't find "
-				      "address for host %s, check slurm.conf", 
-				      name);
-			}
-			mark_as_failed_forward(&tmp_ret_list, name, 
-					SLURM_COMMUNICATIONS_CONNECTION_ERROR);
-			free(name);
-			continue;
-		}
-		
-		if ((fd = slurm_open_msg_conn(&msg->address)) < 0) {
-			if (quiet)
-				debug("slurm_send_recv_msgs to %s: %m", name);
-			else
-				error("slurm_send_recv_msgs to %s: %m", name);
-			mark_as_failed_forward(&tmp_ret_list, name, 
-					SLURM_COMMUNICATIONS_CONNECTION_ERROR);
-			free(name);
-			continue;
-		}
 
-		hostlist_ranged_string(hl, sizeof(buf), buf);
-		forward_init(&msg->forward, NULL);
-		msg->forward.nodelist = xstrdup(buf);
-		msg->forward.timeout = timeout;
-		msg->forward.cnt = hostlist_count(hl);
-		if (msg->forward.nodelist[0]) {
-			debug3("sending to %s along with to %s", 
-			       name, msg->forward.nodelist);
-		} else
-			debug3("sending to %s", name);
-		
-		if(!(ret_list = _send_and_recv_msgs(fd, msg, timeout))) {
-			xfree(msg->forward.nodelist);
-			if (quiet) {
-				debug("slurm_send_recv_msgs"
-				      "(_send_and_recv_msgs) to %s: %m", 
-				      name);
-			} else {
-				error("slurm_send_recv_msgs"
-				      "(_send_and_recv_msgs) to %s: %m", 
-				      name);
-			}
-			mark_as_failed_forward(&tmp_ret_list, name, errno);
-			free(name);
-			continue;
-		} else {
-			itr = list_iterator_create(ret_list);
-			while((ret_data_info = list_next(itr))) 
-				if(!ret_data_info->node_name) {
-					ret_data_info->node_name =
-						xstrdup(name);
-				}
-			list_iterator_destroy(itr);
-		}
-		xfree(msg->forward.nodelist);
-		free(name);
-		break;		
+	if(!hl) {
+		error("slurm_send_recv_msgs: problem creating hostlist");
+		return NULL;
 	}
+
+	ret_list = start_msg_tree(hl, msg, timeout);
 	hostlist_destroy(hl);
 
-	if(tmp_ret_list) {
-		if(!ret_list)
-			ret_list = tmp_ret_list;
-		else {
-			while((ret_data_info = list_pop(tmp_ret_list))) 
-				list_push(ret_list, ret_data_info);
-			list_destroy(tmp_ret_list);
-		}
-	} 
 	return ret_list;
+
+	/* The below code will start from the first node in the list
+	 * to start the tree.  The start_msg_tree function starts the
+	 * tree from the calling node. */
+
+/* 	while((name = hostlist_shift(hl))) { */
+		
+/* 		if(slurm_conf_get_addr(name, &msg->address) == SLURM_ERROR) { */
+/* 			if (quiet) { */
+/* 				debug("slurm_send_recv_msgs: can't find " */
+/* 				      "address for host %s, check slurm.conf",  */
+/* 				      name); */
+/* 			} else { */
+/* 				error("slurm_send_recv_msgs: can't find " */
+/* 				      "address for host %s, check slurm.conf",  */
+/* 				      name); */
+/* 			} */
+/* 			mark_as_failed_forward(&tmp_ret_list, name,  */
+/* 					SLURM_COMMUNICATIONS_CONNECTION_ERROR); */
+/* 			free(name); */
+/* 			continue; */
+/* 		} */
+		
+/* 		if ((fd = slurm_open_msg_conn(&msg->address)) < 0) { */
+/* 			if (quiet) */
+/* 				debug("slurm_send_recv_msgs to %s: %m", name); */
+/* 			else */
+/* 				error("slurm_send_recv_msgs to %s: %m", name); */
+/* 			mark_as_failed_forward(&tmp_ret_list, name,  */
+/* 					SLURM_COMMUNICATIONS_CONNECTION_ERROR); */
+/* 			free(name); */
+/* 			continue; */
+/* 		} */
+
+/* 		hostlist_ranged_string(hl, sizeof(buf), buf); */
+/* 		forward_init(&msg->forward, NULL); */
+/* 		msg->forward.nodelist = xstrdup(buf); */
+/* 		msg->forward.timeout = timeout; */
+/* 		msg->forward.cnt = hostlist_count(hl); */
+/* 		if (msg->forward.nodelist[0]) { */
+/* 			debug3("sending to %s along with %s",  */
+/* 			       name, msg->forward.nodelist); */
+/* 		} else */
+/* 			debug3("sending to %s", name); */
+		
+/* 		if(!(ret_list = _send_and_recv_msgs(fd, msg, timeout))) { */
+/* 			xfree(msg->forward.nodelist); */
+/* 			if (quiet) { */
+/* 				debug("slurm_send_recv_msgs" */
+/* 				      "(_send_and_recv_msgs) to %s: %m",  */
+/* 				      name); */
+/* 			} else { */
+/* 				error("slurm_send_recv_msgs" */
+/* 				      "(_send_and_recv_msgs) to %s: %m",  */
+/* 				      name); */
+/* 			} */
+/* 			mark_as_failed_forward(&tmp_ret_list, name, errno); */
+/* 			free(name); */
+/* 			continue; */
+/* 		} else { */
+/* 			itr = list_iterator_create(ret_list); */
+/* 			while((ret_data_info = list_next(itr)))  */
+/* 				if(!ret_data_info->node_name) { */
+/* 					ret_data_info->node_name = */
+/* 						xstrdup(name); */
+/* 				} */
+/* 			list_iterator_destroy(itr); */
+/* 		} */
+/* 		xfree(msg->forward.nodelist); */
+/* 		free(name); */
+/* 		break;		 */
+/* 	} */
+/* 	hostlist_destroy(hl); */
+
+/* 	if(tmp_ret_list) { */
+/* 		if(!ret_list) */
+/* 			ret_list = tmp_ret_list; */
+/* 		else { */
+/* 			list_transfer(ret_list, tmp_ret_list); */
+/* 			list_destroy(tmp_ret_list); */
+/* 		} */
+/* 	}  */
+/* 	return ret_list; */
 }
 
 /*
@@ -3205,7 +3220,6 @@ List slurm_send_recv_msgs(const char *nodelist, slurm_msg_t *msg,
 List slurm_send_addr_recv_msgs(slurm_msg_t *msg, char *name, int timeout)
 {
 	List ret_list = NULL;
-	List tmp_ret_list = NULL;
 	slurm_fd fd = -1;
 	ret_data_info_t *ret_data_info = NULL;
 	ListIterator itr;
@@ -3213,15 +3227,15 @@ List slurm_send_addr_recv_msgs(slurm_msg_t *msg, char *name, int timeout)
 	if ((fd = slurm_open_msg_conn(&msg->address)) < 0) {
 		mark_as_failed_forward(&ret_list, name, 
 				       SLURM_COMMUNICATIONS_CONNECTION_ERROR);
+		errno = SLURM_COMMUNICATIONS_CONNECTION_ERROR;
 		return ret_list;
 	}
 
-	/*just to make sure */
-	forward_init(&msg->forward, NULL);
 	msg->ret_list = NULL;
 	msg->forward_struct = NULL;
 	if(!(ret_list = _send_and_recv_msgs(fd, msg, timeout))) {
-		mark_as_failed_forward(&tmp_ret_list, name, errno);
+		mark_as_failed_forward(&ret_list, name, errno);
+		errno = SLURM_COMMUNICATIONS_CONNECTION_ERROR;
 		return ret_list;
 	} else {
 		itr = list_iterator_create(ret_list);
-- 
GitLab