From 5d5be7b2c90e024b545395fd00b8df121330c46c Mon Sep 17 00:00:00 2001
From: "Christopher J. Morrone" <morrone2@llnl.gov>
Date: Fri, 14 Jul 2006 23:45:13 +0000
Subject: [PATCH] Add multiple-message-response socket support to
 slurm_launch_tasks

---
 src/api/step_ctx.h    |  3 +-
 src/api/step_launch.c | 77 ++++++++++++++++++++++++++-----------------
 2 files changed, 49 insertions(+), 31 deletions(-)

diff --git a/src/api/step_ctx.h b/src/api/step_ctx.h
index c9945d169c3..d4700a77565 100644
--- a/src/api/step_ctx.h
+++ b/src/api/step_ctx.h
@@ -55,7 +55,8 @@ struct step_launch_state {
 	/* message thread variables */
 	eio_handle_t *msg_handle;
 	pthread_t msg_thread;
-	uint16_t msg_port;
+	uint16_t num_resp_port;
+	uint16_t *resp_port; /* array of message response ports */
 
 	/* client side io variables */
 	client_io_t *client_io;
diff --git a/src/api/step_launch.c b/src/api/step_launch.c
index 9e7426d2afb..6cad0881974 100644
--- a/src/api/step_launch.c
+++ b/src/api/step_launch.c
@@ -61,8 +61,6 @@
 #include "src/api/step_ctx.h"
 #include "src/api/step_pmi.h"
 
-#define STEP_LAUNCH_TIMEOUT 10 /* FIXME - should be defined elsewhere */
-
 /**********************************************************************
  * General declarations for step launch code
  **********************************************************************/
@@ -78,7 +76,7 @@ static client_io_t *_setup_step_client_io(slurm_step_ctx ctx,
  * Message handler declarations
  **********************************************************************/
 static uid_t  slurm_uid;
-static int _msg_thr_create(struct step_launch_state *sls);
+static int _msg_thr_create(struct step_launch_state *sls, int num_nodes);
 static void _handle_msg(struct step_launch_state *sls, slurm_msg_t *msg);
 static bool _message_socket_readable(eio_obj_t *obj);
 static int _message_socket_accept(eio_obj_t *obj, List objs);
@@ -157,8 +155,8 @@ int slurm_step_launch (slurm_step_ctx ctx,
 	ctx->launch_state->task_start_callback = params->task_start_callback;
 	ctx->launch_state->task_finish_callback = params->task_finish_callback;
 
-	/* Create message receiving socket and handler thread */
-	_msg_thr_create(ctx->launch_state);
+	/* Create message receiving sockets and handler thread */
+	_msg_thr_create(ctx->launch_state, ctx->step_req->node_count);
 
 	/* Start tasks on compute nodes */
 	launch.job_id = ctx->alloc_resp->job_id;
@@ -211,17 +209,17 @@ int slurm_step_launch (slurm_step_ctx ctx,
 	if (client_io_handler_start(ctx->launch_state->client_io) != SLURM_SUCCESS)
 		return SLURM_ERROR;
 
-	launch.io_port = xmalloc(sizeof(uint16_t) * launch.nnodes);
-	launch.resp_port = xmalloc(sizeof(uint16_t) * launch.nnodes);
-	
 	launch.num_io_port = ctx->launch_state->client_io->num_listen;
+	launch.io_port = xmalloc(sizeof(uint16_t) * launch.num_io_port);
 	for (i = 0; i < launch.num_io_port; i++) {
 		launch.io_port[i] =
 			ntohs(ctx->launch_state->client_io->listenport[i]);
 	}
-	launch.num_resp_port = 1;
+	
+	launch.num_resp_port = ctx->launch_state->num_resp_port;
+	launch.resp_port = xmalloc(sizeof(uint16_t) * launch.num_resp_port);
 	for (i = 0; i < launch.num_resp_port; i++) {
-		launch.resp_port[i] = ntohs(ctx->launch_state->msg_port);
+		launch.resp_port[i] = ntohs(ctx->launch_state->resp_port[i]);
 	}
 
 	_launch_tasks(ctx, &launch);
@@ -274,6 +272,7 @@ void slurm_step_launch_wait_finish(slurm_step_ctx ctx)
 	/* FIXME - put these in an sls-specific desctructor */
 	pthread_mutex_destroy(&sls->lock);
 	pthread_cond_destroy(&sls->cond);
+	xfree(sls->resp_port);
 }
 
 /**********************************************************************
@@ -288,28 +287,44 @@ static void *_msg_thr_internal(void *arg)
 	return NULL;
 }
 
-static int _msg_thr_create(struct step_launch_state *sls)
+static inline int
+_estimate_nports(int nclients, int cli_per_port)
+{
+	div_t d;
+	d = div(nclients, cli_per_port);
+	return d.rem > 0 ? d.quot + 1 : d.quot;
+}
+
+static int _msg_thr_create(struct step_launch_state *sls, int num_nodes)
 {
 	int sock = -1;
 	int port = -1;
 	eio_obj_t *obj;
+	int i;
 
 	debug("Entering _msg_thr_create()");
 	slurm_uid = (uid_t) slurm_get_slurm_user_id();
 
-	if (net_stream_listen(&sock, &port) < 0) {
-		error("unable to intialize step launch listening socket: %m");
-		return SLURM_ERROR;
+	sls->msg_handle = eio_handle_create();
+	sls->num_resp_port = _estimate_nports(num_nodes, 48);
+	sls->resp_port = xmalloc(sizeof(uint16_t) * sls->num_resp_port);
+	for (i = 0; i < sls->num_resp_port; i++) {
+		if (net_stream_listen(&sock, &port) < 0) {
+			error("unable to intialize step launch listening socket: %m");
+			return SLURM_ERROR;
+		}
+		sls->resp_port[i] = port;
+		obj = eio_obj_create(sock, &message_socket_ops, (void *)sls);
+		eio_new_initial_obj(sls->msg_handle, obj);
 	}
-	sls->msg_port = port;
 
-	obj = eio_obj_create(sock, &message_socket_ops, (void *)sls);
-
-	sls->msg_handle = eio_handle_create();
-	eio_new_initial_obj(sls->msg_handle, obj);
-	/* FIXME check return code */
-	pthread_create(&sls->msg_thread, NULL, _msg_thr_internal, (void *)sls);
+	if (pthread_create(&sls->msg_thread, NULL,
+			   _msg_thr_internal, (void *)sls) != 0) {
+		error("pthread_create of message thread: %m");
+		return SLURM_ERROR;
+	}
 
+	return SLURM_SUCCESS;
 }
 
 static bool _message_socket_readable(eio_obj_t *obj)
@@ -339,6 +354,7 @@ static int _message_socket_accept(eio_obj_t *obj, List objs)
 	struct sockaddr_un addr;
 	slurm_msg_t *msg = NULL;
 	int len = sizeof(addr);
+	int          timeout = 0;	/* slurm default value */
 	List ret_list = NULL;
 
 	debug3("Called _msg_socket_accept");
@@ -374,11 +390,13 @@ static int _message_socket_accept(eio_obj_t *obj, List objs)
 	msg->conn_fd = fd;
 	msg->forward_struct_init = 0;
 
+	/* multiple jobs (easily induced via no_alloc) and highly
+	 * parallel jobs using PMI sometimes result in slow message 
+	 * responses and timeouts. Raise the default timeout for srun. */
+	timeout = slurm_get_msg_timeout() * 8;
 again:
-	ret_list = slurm_receive_msg(fd, msg, STEP_LAUNCH_TIMEOUT);
+	ret_list = slurm_receive_msg(fd, msg, timeout);
 	if(!ret_list || errno != SLURM_SUCCESS) {
-		printf("error on slurm_recieve_msg\n");
-		fflush(stdout);
 		if (errno == EINTR) {
 			list_destroy(ret_list);
 			goto again;
@@ -388,10 +406,6 @@ again:
 		goto cleanup;
 	}
 	if(list_count(ret_list)>0) {
-		printf("_message_socket_accept connection: "
-		      "got %d from receive, expecting 0\n",
-		       list_count(ret_list));
-		fflush(stdout);
 		error("_message_socket_accept connection: "
 		      "got %d from receive, expecting 0",
 		      list_count(ret_list));
@@ -520,6 +534,7 @@ static int _launch_tasks(slurm_step_ctx ctx,
 	ListIterator ret_data_itr;
 	ret_types_t *ret;
 	ret_data_info_t *ret_data;
+	int timeout;
 /* 	slurm_addr *step_addrs; */ /* array of addresses */
 /* 	int num_step_addrs; */
 
@@ -546,17 +561,19 @@ static int _launch_tasks(slurm_step_ctx ctx,
 	msg.buffer = buffer;
 	memcpy(&msg.address, &ctx->alloc_resp->node_addr[0],
 	       sizeof(slurm_addr));
+	timeout = slurm_get_msg_timeout();
  	forward_set_launch(&msg.forward,
 			   ctx->step_req->node_count,
 			   &zero,
 			   ctx->step_layout,
 			   ctx->alloc_resp->node_addr,
 			   itr,
-			   STEP_LAUNCH_TIMEOUT);
+			   timeout);
 	hostlist_iterator_destroy(itr);
 	hostlist_destroy(hostlist);
 
-	ret_list = slurm_send_recv_rc_packed_msg(&msg, STEP_LAUNCH_TIMEOUT);
+	ret_list =
+		slurm_send_recv_rc_packed_msg(&msg, timeout);
 	if (ret_list == NULL) {
 		error("slurm_send_recv_rc_packed_msg failed miserably: %m");
 /* 		xfree(step_addrs); */
-- 
GitLab