From c3a6f92c068bfd78df0ba1e5df4a9992aaac98ec Mon Sep 17 00:00:00 2001
From: Moe Jette <jette1@llnl.gov>
Date: Mon, 10 Apr 2006 23:13:50 +0000
Subject: [PATCH] Multi-thread sbcast logic (4-way fanout from sbcast itself).

---
 src/sbcast/agent.c | 159 +++++++++++++++++++++++++++++++--------------
 1 file changed, 112 insertions(+), 47 deletions(-)

diff --git a/src/sbcast/agent.c b/src/sbcast/agent.c
index 6aefe87a0eb..981e4bcae75 100644
--- a/src/sbcast/agent.c
+++ b/src/sbcast/agent.c
@@ -32,6 +32,7 @@
 
 #include <errno.h>
 #include <fcntl.h>
+#include <pthread.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <unistd.h>
@@ -39,14 +40,19 @@
 
 #include "src/common/hostlist.h"
 #include "src/common/log.h"
+#include "src/common/read_config.h"
 #include "src/common/slurm_protocol_api.h"
+#include "src/common/slurm_protocol_defs.h"
 #include "src/common/slurm_protocol_interface.h"
 #include "src/common/xmalloc.h"
 #include "src/common/xstring.h"
 #include "src/common/forward.h"
 #include "src/sbcast/sbcast.h"
 
-#define AGENT_THREAD_COUNT 10
+#define MAX_RETRIES     10
+#define MAX_THREADS      4	/* These are huge messages, so only
+				 * run MAX_THREADS at one time */
+
 typedef enum {
 	DSH_NEW,	/* Request not yet started */
 	DSH_ACTIVE,	/* Request in progress */
@@ -56,32 +62,31 @@ typedef enum {
 } state_t;
 
 typedef struct thd {
-	pthread_t thread;		/* thread ID */
-	pthread_attr_t attr;		/* thread attributes */
-	state_t state;			/* thread state */
-	time_t start_time;		/* start time */
-	time_t end_time;		/* end time or delta time
-					 * upon termination */
-	struct sockaddr_in slurm_addr;	/* structure holding info for all
-					 * forwarding info */
-	char node_name[MAX_SLURM_NAME];	/* node's name */
-	List ret_list;
+	pthread_t thread;	/* thread ID */
+	pthread_attr_t attr;	/* thread attributes */
+	slurm_msg_t *msg;	/* message to send */
+	int rc;			/* highest return codes from RPC */
 } thd_t;
 
+static pthread_mutex_t agent_cnt_mutex = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t  agent_cnt_cond  = PTHREAD_COND_INITIALIZER;
+static int agent_cnt = 0;
+
+static void *_agent_thread(void *args);
+
 /* issue the RPC to ship the file's data */
 extern void send_rpc(file_bcast_msg_t *bcast_msg, 
 		resource_allocation_response_msg_t *alloc_resp)
 {
 	/* This code will handle message fanout to multiple slurmd */
-	forward_t from, forward;
-	slurm_msg_t msg;
+	forward_t from, *forward;
 	hostlist_t hl;
-	List ret_list = NULL;
-	ListIterator itr, data_itr;
-	ret_types_t *ret_type = NULL;
-	ret_data_info_t *ret_data_info = NULL;
-	int i, rc = SLURM_SUCCESS;
+	int i, rc = 0, retries = 0;
+	int thr_count = 0, *span = set_span(alloc_resp->node_cnt);
+	thd_t *thread_info;
 
+	thread_info	= xmalloc(slurm_get_tree_width() * sizeof(thd_t));
+	forward         = xmalloc(slurm_get_tree_width() * sizeof(forward_t));
 	from.cnt	= alloc_resp->node_cnt;
 	from.name	= xmalloc(MAX_SLURM_NAME * alloc_resp->node_cnt);
 	hl = hostlist_create(alloc_resp->node_list);
@@ -94,18 +99,73 @@ extern void send_rpc(file_bcast_msg_t *bcast_msg,
 	from.addr	= alloc_resp->node_addr;
 	from.node_id	= NULL;
 	from.timeout	= SLURM_MESSAGE_TIMEOUT_MSEC_STATIC;
-	i = 0;
-	forward_set(&forward, alloc_resp->node_cnt, &i, &from);
-
-	msg.msg_type	= REQUEST_FILE_BCAST;
-	msg.address	= alloc_resp->node_addr[0];
-	msg.data	= bcast_msg;
-	msg.forward	= forward;
-	msg.ret_list	= NULL;
-	msg.orig_addr.sin_addr.s_addr = 0;
-	msg.srun_node_id = 0;
-
-	ret_list = slurm_send_recv_rc_msg(&msg, 
+	for (i=0; i<alloc_resp->node_cnt; i++) {
+		forward_set(&forward[i], span[thr_count], &i, &from);
+		thread_info[i].msg = xmalloc(sizeof(slurm_msg_t));
+		thread_info[i].msg->msg_type	= REQUEST_FILE_BCAST;
+		thread_info[i].msg->data	= bcast_msg;
+		thread_info[i].msg->forward	= forward[i];
+		thread_info[i].msg->ret_list	= NULL;
+		thread_info[i].msg->orig_addr.sin_addr.s_addr = 0;
+		thread_info[i].msg->srun_node_id= 0;
+		thread_info[i].rc		= -1;
+		thread_info[i].msg->address	= alloc_resp->node_addr[i];
+		thr_count++;
+	}
+	xfree(span);
+	debug("spawning %d threads", thr_count);
+
+	for (i=0; i<thr_count; i++) {
+		slurm_mutex_lock(&agent_cnt_mutex);
+		while (agent_cnt >= MAX_THREADS)
+			pthread_cond_wait(&agent_cnt_cond, &agent_cnt_mutex);
+		agent_cnt++;
+		slurm_mutex_unlock(&agent_cnt_mutex);
+
+		slurm_attr_init(&thread_info[i].attr);
+		if (pthread_attr_setdetachstate (&thread_info[i].attr,
+				PTHREAD_CREATE_JOINABLE))
+			error("pthread_attr_setdetachstate error %m");
+		while (pthread_create(&thread_info[i].thread, 
+				&thread_info[i].attr, 
+				_agent_thread,
+				(void *) &thread_info[i])) {
+			error("pthread_create error %m");
+			if (++retries > MAX_RETRIES)
+				fatal("Can't create pthread");
+			sleep(1); 	/* sleep and again */
+		}
+	}
+
+	/* wait until pthreads complete */
+	slurm_mutex_lock(&agent_cnt_mutex);
+	while (agent_cnt)
+		pthread_cond_wait(&agent_cnt_cond, &agent_cnt_mutex);
+	slurm_mutex_unlock(&agent_cnt_mutex);
+
+	for (i=0; i<thr_count; i++) {
+		rc = MAX(rc, thread_info[i].rc);
+		xfree(thread_info[i].msg);
+		destroy_forward(&forward[i]);
+	}
+	xfree(from.name);
+	xfree(forward);
+	xfree(thread_info);
+	if (rc)
+		exit(1);
+}
+
+static void *_agent_thread(void *args)
+{
+	List ret_list = NULL;
+	ret_types_t *ret_type = NULL;
+	thd_t *thread_ptr = (thd_t *) args;
+	slurm_msg_t *msg = thread_ptr->msg;
+	ListIterator itr, data_itr;
+	ret_data_info_t *ret_data_info = NULL;
+	int rc = 0;
+
+	ret_list = slurm_send_recv_rc_msg(msg, 
 			SLURM_MESSAGE_TIMEOUT_MSEC_STATIC);
 	if (ret_list == NULL) {
 		error("slurm_send_recv_rc_msg: %m");
@@ -115,28 +175,33 @@ extern void send_rpc(file_bcast_msg_t *bcast_msg,
 	itr = list_iterator_create(ret_list);
 	while ((ret_type = list_next(itr)) != NULL) {
 		data_itr = list_iterator_create(ret_type->ret_data_list);
-		while((ret_data_info = list_next(data_itr)) != NULL) {
-			i = ret_type->msg_rc;
-			if (i != SLURM_SUCCESS) {
-				if (!strcmp(ret_data_info->node_name,
-						"localhost")) {
-					xfree(ret_data_info->node_name);
-					ret_data_info->node_name =
-						xstrdup(from.name);
-				}
-				error("REQUEST_FILE_BCAST(%s): %s",
-					ret_data_info->node_name,
-					slurm_strerror(i));
-				rc = i;
+		while ((ret_data_info = list_next(data_itr)) != NULL) {
+			if (ret_type->msg_rc == SLURM_SUCCESS)
+				continue;
+			if (!strcmp(ret_data_info->node_name,
+					"localhost")) {
+				xfree(ret_data_info->node_name);
+				ret_data_info->node_name = 
+					xmalloc(MAX_SLURM_NAME);
+				getnodename(ret_data_info->node_name, 
+					MAX_SLURM_NAME);
 			}
+			error("REQUEST_FILE_BCAST(%s): %s",
+				ret_data_info->node_name,
+				slurm_strerror(ret_type->msg_rc));
+			rc = MAX(rc, ret_type->msg_rc);
 		}
 		list_iterator_destroy(data_itr);
 	}
+
+	thread_ptr->rc = rc;
 	list_iterator_destroy(itr);
 	if (ret_list)
 		list_destroy(ret_list);
-	xfree(from.name);
-	destroy_forward(&forward);
-	if (rc)
-		exit(1);
+	slurm_mutex_lock(&agent_cnt_mutex);
+	agent_cnt--;
+	slurm_mutex_unlock(&agent_cnt_mutex);
+	pthread_cond_broadcast(&agent_cnt_cond);
+	return NULL;
 }
+
-- 
GitLab