From 412768d00f42c49af37d4d194395b9a787bc0397 Mon Sep 17 00:00:00 2001
From: "Christopher J. Morrone" <morrone2@llnl.gov>
Date: Wed, 30 Aug 2006 03:10:32 +0000
Subject: [PATCH] Overhaul the reattach RPC to have a single common message
 body for all nodes.  This will allow us to use tree communication for the
 reattach RPC in sattach.

---
 src/common/slurm_protocol_defs.c |  5 ++---
 src/common/slurm_protocol_defs.h | 14 ++++++-------
 src/common/slurm_protocol_pack.c | 35 ++++++++++++++++++++------------
 src/slurmd/common/stepd_api.c    |  1 +
 src/slurmd/common/stepd_api.h    |  1 +
 src/slurmd/slurmd/req.c          | 10 ++++++---
 src/slurmd/slurmstepd/req.c      |  1 +
 src/srun/reattach.c              | 35 +++++++++++++-------------------
 8 files changed, 55 insertions(+), 47 deletions(-)

diff --git a/src/common/slurm_protocol_defs.c b/src/common/slurm_protocol_defs.c
index 9b4c21a3098..6f4c15e0159 100644
--- a/src/common/slurm_protocol_defs.c
+++ b/src/common/slurm_protocol_defs.c
@@ -406,9 +406,8 @@ void slurm_free_spawn_task_request_msg(spawn_task_request_msg_t * msg)
 void slurm_free_reattach_tasks_request_msg(reattach_tasks_request_msg_t *msg)
 {
 	if (msg) {
-		xfree(msg->ofname);
-		xfree(msg->efname);
-		xfree(msg->ifname);
+		xfree(msg->resp_port);
+		xfree(msg->io_port);
 		slurm_cred_destroy(msg->cred);
 		xfree(msg);
 	}
diff --git a/src/common/slurm_protocol_defs.h b/src/common/slurm_protocol_defs.h
index 81e4d37111d..aad5c025c5e 100644
--- a/src/common/slurm_protocol_defs.h
+++ b/src/common/slurm_protocol_defs.h
@@ -472,13 +472,13 @@ typedef struct job_time_msg {
 typedef struct reattach_tasks_request_msg {
 	uint32_t     job_id;
 	uint32_t     job_step_id;
-	uint32_t     srun_node_id;
-	uint16_t     resp_port;
-	uint16_t     io_port;
-	char        *ofname;
-	char        *efname;
-	char        *ifname;
-	slurm_cred_t cred;
+	uint16_t     num_resp_port;
+	uint16_t    *resp_port; /* array of available response ports */
+	uint16_t     num_io_port;
+	uint16_t    *io_port;   /* array of available client IO ports */
+	slurm_cred_t cred;      /* used only a weak authentication mechanism
+				   for the slurmstepd to use when connecting
+				   back to the client */
 } reattach_tasks_request_msg_t;
 
 typedef struct reattach_tasks_response_msg {
diff --git a/src/common/slurm_protocol_pack.c b/src/common/slurm_protocol_pack.c
index 8a69a345695..488eb62c295 100644
--- a/src/common/slurm_protocol_pack.c
+++ b/src/common/slurm_protocol_pack.c
@@ -2489,15 +2489,18 @@ static void
 _pack_reattach_tasks_request_msg(reattach_tasks_request_msg_t * msg,
 				 Buf buffer)
 {
+	int i;
+
 	xassert(msg != NULL);
 	pack32((uint32_t)msg->job_id, buffer);
 	pack32((uint32_t)msg->job_step_id, buffer);
-	pack32((uint32_t)msg->srun_node_id, buffer);
-	pack16((uint16_t)msg->resp_port, buffer);
-	pack16((uint16_t)msg->io_port, buffer);
-	packstr(msg->ofname, buffer);
-	packstr(msg->efname, buffer);
-	packstr(msg->ifname, buffer);
+	pack16((uint16_t)msg->num_resp_port, buffer);
+	for(i = 0; i < msg->num_resp_port; i++)
+		pack16((uint16_t)msg->resp_port[i], buffer);
+	pack16((uint16_t)msg->num_io_port, buffer);
+	for(i = 0; i < msg->num_io_port; i++)
+		pack16((uint16_t)msg->io_port[i], buffer);
+
 	slurm_cred_pack(msg->cred, buffer);
 }
 
@@ -2505,8 +2508,8 @@ static int
 _unpack_reattach_tasks_request_msg(reattach_tasks_request_msg_t ** msg_ptr,
 				   Buf buffer)
 {
-	uint16_t uint16_tmp;
 	reattach_tasks_request_msg_t *msg;
+	int i;
 
 	xassert(msg_ptr != NULL);
 	msg = xmalloc(sizeof(*msg));
@@ -2514,12 +2517,18 @@ _unpack_reattach_tasks_request_msg(reattach_tasks_request_msg_t ** msg_ptr,
 
 	safe_unpack32(&msg->job_id, buffer);
 	safe_unpack32(&msg->job_step_id, buffer);
-	safe_unpack32(&msg->srun_node_id, buffer);
-	safe_unpack16(&msg->resp_port, buffer);
-	safe_unpack16(&msg->io_port, buffer);
-	safe_unpackstr_xmalloc(&msg->ofname, &uint16_tmp, buffer);
-	safe_unpackstr_xmalloc(&msg->efname, &uint16_tmp, buffer);
-	safe_unpackstr_xmalloc(&msg->ifname, &uint16_tmp, buffer);
+	safe_unpack16(&msg->num_resp_port, buffer);
+	if (msg->num_resp_port > 0) {
+		msg->resp_port = xmalloc(sizeof(uint16_t)*msg->num_resp_port);
+		for (i = 0; i < msg->num_resp_port; i++)
+			safe_unpack16(&msg->resp_port[i], buffer);
+	}
+	safe_unpack16(&msg->num_io_port, buffer);
+	if (msg->num_io_port > 0) {
+		msg->io_port = xmalloc(sizeof(uint16_t)*msg->num_io_port);
+		for (i = 0; i < msg->num_io_port; i++)
+			safe_unpack16(&msg->io_port[i], buffer);
+	}
 
 	if (!(msg->cred = slurm_cred_unpack(buffer)))
 		goto unpack_error;
diff --git a/src/slurmd/common/stepd_api.c b/src/slurmd/common/stepd_api.c
index c412f0feab6..0a61a57c9dc 100644
--- a/src/slurmd/common/stepd_api.c
+++ b/src/slurmd/common/stepd_api.c
@@ -218,6 +218,7 @@ stepd_get_info(int fd)
 	safe_read(fd, &info->uid, sizeof(uid_t));
 	safe_read(fd, &info->jobid, sizeof(uint32_t));
 	safe_read(fd, &info->stepid, sizeof(uint32_t));
+	safe_read(fd, &info->nodeid, sizeof(uint32_t));
 
 	return info;
 rwfail:
diff --git a/src/slurmd/common/stepd_api.h b/src/slurmd/common/stepd_api.h
index af5480f4c9c..ce12454dc66 100644
--- a/src/slurmd/common/stepd_api.h
+++ b/src/slurmd/common/stepd_api.h
@@ -69,6 +69,7 @@ typedef struct {
 	uid_t uid;
 	uint32_t jobid;
 	uint32_t stepid;
+	uint32_t nodeid;
 } slurmstepd_info_t;
 
 /*
diff --git a/src/slurmd/slurmd/req.c b/src/slurmd/slurmd/req.c
index 6b21305edc7..33714b2fa0e 100644
--- a/src/slurmd/slurmd/req.c
+++ b/src/slurmd/slurmd/req.c
@@ -1413,6 +1413,7 @@ _rpc_reattach_tasks(slurm_msg_t *msg)
 	uid_t             req_uid;
 	slurmstepd_info_t *step = NULL;
 	slurm_addr *cli = &msg->orig_addr;
+	uint32_t nodeid = (uint32_t)NO_VAL;
 	
 	memset(&resp_msg, 0, sizeof(slurm_msg_t));
 	fd = stepd_connect(conf->spooldir, conf->node_name,
@@ -1429,6 +1430,7 @@ _rpc_reattach_tasks(slurm_msg_t *msg)
 		rc = ESLURM_INVALID_JOB_ID;
 		goto done2;
 	} 
+	nodeid = step->nodeid;
 
 	req_uid = g_slurm_auth_get_uid(msg->auth_cred);
 	if ((req_uid != step->uid) && (!_slurm_authorized_user(req_uid))) {
@@ -1446,13 +1448,15 @@ _rpc_reattach_tasks(slurm_msg_t *msg)
 	 * Set response address by resp_port and client address
 	 */
 	memcpy(&resp_msg.address, cli, sizeof(slurm_addr));
-	slurm_set_addr(&resp_msg.address, req->resp_port, NULL); 
+	port = req->resp_port[nodeid % req->num_resp_port];
+	slurm_set_addr(&resp_msg.address, port, NULL); 
 	
 	/* 
 	 * Set IO address by io_port and client address
 	 */
 	memcpy(&ioaddr, cli, sizeof(slurm_addr));
-	slurm_set_addr(&ioaddr, req->io_port, NULL);
+	port = req->io_port[nodeid % req->num_io_port];
+	slurm_set_addr(&ioaddr, port, NULL);
 
 	/*
 	 * Get the signature of the job credential.  slurmstepd will need
@@ -1480,7 +1484,7 @@ done:
 	resp_msg.forward      = msg->forward;
 	resp_msg.ret_list     = msg->ret_list;
 	resp->node_name       = xstrdup(conf->node_name);
-	resp->srun_node_id    = req->srun_node_id;
+	resp->srun_node_id    = nodeid;
 	resp->return_code     = rc;
 
 	slurm_send_node_msg(msg->conn_fd, &resp_msg);
diff --git a/src/slurmd/slurmstepd/req.c b/src/slurmd/slurmstepd/req.c
index b1527e4893e..416dea4fde2 100644
--- a/src/slurmd/slurmstepd/req.c
+++ b/src/slurmd/slurmstepd/req.c
@@ -501,6 +501,7 @@ _handle_info(int fd, slurmd_job_t *job)
 	safe_write(fd, &job->uid, sizeof(uid_t));
 	safe_write(fd, &job->jobid, sizeof(uint32_t));
 	safe_write(fd, &job->stepid, sizeof(uint32_t));
+	safe_write(fd, &job->nodeid, sizeof(uint32_t));
 
 	return SLURM_SUCCESS;
 rwfail:
diff --git a/src/srun/reattach.c b/src/srun/reattach.c
index 9f99986dd20..45c7ac52750 100644
--- a/src/srun/reattach.c
+++ b/src/srun/reattach.c
@@ -69,7 +69,8 @@ typedef struct thd {
         pthread_attr_t	attr;			/* thread attributes */
         state_t		state;      		/* thread state */
 	slurm_msg_t    *msg;
-	srun_job_t          *job;
+	srun_job_t     *job;
+	uint32_t        nodeid;
 } thd_t;
 
 static void		 _p_reattach(slurm_msg_t *req, srun_job_t *job);
@@ -319,23 +320,14 @@ _attach_to_job(srun_job_t *job)
 
 		r->job_id          = job->jobid;
 		r->job_step_id     = job->stepid;
-		r->srun_node_id    = (uint32_t) i;
-		r->io_port         = 
-			ntohs(job->client_io->
-			      listenport[i%job->client_io->num_listen]);
-		r->resp_port       = 
-			ntohs(job->
-			      jaddr[i%job->njfds].sin_port);
+		r->num_io_port     = 1;
+		r->io_port         = (uint16_t *)xmalloc(sizeof(uint16_t));
+		r->io_port[0]      = ntohs(job->client_io->listenport[
+					   i%job->client_io->num_listen]);
+		r->num_resp_port   = 1;
+		r->resp_port	   = (uint16_t *)xmalloc(sizeof(uint16_t));
+		r->resp_port[0]    = ntohs(job->jaddr[i%job->njfds].sin_port);
 		r->cred            = job->cred;
-
-
-		/* XXX: redirecting output to files not yet
-		 * supported
-		 */
-		r->ofname          = NULL;
-		r->efname          = NULL;
-		r->ifname          = NULL;
-
 		m->data            = r;
 		m->msg_type        = REQUEST_REATTACH_TASKS;
 		forward_init(&m->forward, NULL);
@@ -368,6 +360,7 @@ _p_reattach(slurm_msg_t *msg, srun_job_t *job)
 
 		thd[i].msg = &msg[i];
 		thd[i].job = job;
+		thd[i].nodeid = i;
 
 		slurm_attr_init(&thd[i].attr);
 		if (pthread_attr_setdetachstate(&thd[i].attr,
@@ -397,8 +390,8 @@ _p_reattach_task(void *arg)
 	thd_t *t   = (thd_t *) arg;
 	int rc     = 0;
 	reattach_tasks_request_msg_t *req = t->msg->data;
-	int nodeid = req->srun_node_id; 
-	char *host = nodelist_nth_host(t->job->step_layout->node_list, nodeid);
+	char *host = nodelist_nth_host(t->job->step_layout->node_list,
+				       t->nodeid);
 	
 	t->state = THD_ACTIVE;
 	debug3("sending reattach request to %s", host);
@@ -407,10 +400,10 @@ _p_reattach_task(void *arg)
 	if (rc < 0) {
 		error("reattach: %s: %m", host);
 		t->state = THD_FAILED;
-		t->job->host_state[nodeid] = SRUN_HOST_REPLIED;
+		t->job->host_state[t->nodeid] = SRUN_HOST_REPLIED;
 	} else {
 		t->state = THD_DONE;
-		t->job->host_state[nodeid] = SRUN_HOST_UNREACHABLE;
+		t->job->host_state[t->nodeid] = SRUN_HOST_UNREACHABLE;
 	}
 	free(host);
 	slurm_mutex_lock(&active_mutex);
-- 
GitLab