From 27e0b0f1113d2dcb51d76afaf9d2fca4cb43d63b Mon Sep 17 00:00:00 2001
From: Morris Jette <jette@schedmd.com>
Date: Thu, 25 May 2017 14:44:49 -0600
Subject: [PATCH] Return Pack Job Response after allocation

---
 src/api/allocate.c        | 102 ++++++++++++++++++++++----------------
 src/salloc/salloc.c       |   7 +--
 src/slurmctld/proc_req.c  |  13 +++++
 src/slurmctld/srun_comm.c |  39 +++++++++++----
 4 files changed, 107 insertions(+), 54 deletions(-)

diff --git a/src/api/allocate.c b/src/api/allocate.c
index f0b95b85fe3..f2fd34e467e 100644
--- a/src/api/allocate.c
+++ b/src/api/allocate.c
@@ -78,8 +78,10 @@ typedef struct {
 static int _handle_rc_msg(slurm_msg_t *msg);
 static listen_t *_create_allocation_response_socket(char *interface_hostname);
 static void _destroy_allocation_response_socket(listen_t *listen);
-static resource_allocation_response_msg_t *_wait_for_allocation_response(
-	uint32_t job_id, const listen_t *listen, int timeout);
+static void _wait_for_allocation_response(uint32_t job_id,
+					  const listen_t *listen,
+					  uint16_t msg_type, int timeout,
+					  void **resp);
 
 /*
  * slurm_allocate_resources - allocate resources for a job request
@@ -256,8 +258,9 @@ slurm_allocate_resources_blocking (const job_desc_msg_t *user_req,
 			slurm_free_resource_allocation_response_msg(resp);
 			if (pending_callback != NULL)
 				pending_callback(job_id);
- 			resp = _wait_for_allocation_response(job_id, listen,
-							     timeout);
+			_wait_for_allocation_response(job_id, listen,
+						RESPONSE_RESOURCE_ALLOCATION,
+						timeout, (void **) &resp);
 			/* If NULL, we didn't get the allocation in
 			   the time desired, so just free the job id */
 			if ((resp == NULL) && (errno != ESLURM_ALREADY_DONE)) {
@@ -281,6 +284,26 @@ slurm_allocate_resources_blocking (const job_desc_msg_t *user_req,
 	return resp;
 }
 
+/* Get total node cound and lead job ID from RESPONSE_JOB_PACK_ALLOCATION */
+static void _pack_alloc_test(List resp, uint32_t *node_cnt, uint32_t *job_id)
+{
+	resource_allocation_response_msg_t *alloc;
+	uint32_t pack_node_cnt = 0, pack_job_id = 0;
+	ListIterator iter;
+
+	xassert(resp);
+	iter = list_iterator_create(resp);
+	while ((alloc = (resource_allocation_response_msg_t *)list_next(iter))){
+		pack_node_cnt += alloc->node_cnt;
+		if (pack_job_id == 0)
+			pack_job_id = alloc->job_id;
+	}
+	list_iterator_destroy(iter);
+
+	*job_id   = pack_job_id;
+	*node_cnt = pack_node_cnt;
+}
+
 /*
  * slurm_allocate_pack_job_blocking
  *	allocate resources for a list of job requests.  This call will block
@@ -314,6 +337,7 @@ List slurm_allocate_pack_job_blocking(List job_req_list, time_t timeout,
 	ListIterator iter;
 	bool immediate_flag = false;
 	bool immediate_logged = false;
+	uint32_t node_cnt = 0, job_id = 0;
 
 	slurm_msg_t_init(&req_msg);
 	slurm_msg_t_init(&resp_msg);
@@ -321,11 +345,20 @@ List slurm_allocate_pack_job_blocking(List job_req_list, time_t timeout,
 	/*
 	 * set node name and session ID for this request
 	 */
+
+	if (!immediate_flag) {
+		listen = _create_allocation_response_socket(local_hostname);
+		if (listen == NULL)
+			return NULL;
+	}
+
 	local_hostname = xshort_hostname();
 	iter = list_iterator_create(job_req_list);
 	while ((req = (job_desc_msg_t *) list_next(iter))) {
 		if (req->alloc_sid == NO_VAL)
 			req->alloc_sid = getsid(0);
+		if (listen)
+			req->alloc_resp_port = listen->port;
 
 		if (!req->alloc_node) {
 			if (local_hostname) {
@@ -344,19 +377,6 @@ List slurm_allocate_pack_job_blocking(List job_req_list, time_t timeout,
 	}
 	list_iterator_destroy(iter);
 
-	if (!immediate_flag) {
-		listen = _create_allocation_response_socket(local_hostname);
-		if (listen == NULL) {
-			xfree(local_hostname);
-			return NULL;
-		}
-
-		iter = list_iterator_create(job_req_list);
-		while ((req = (job_desc_msg_t *)list_next(iter)))
-			req->alloc_resp_port = listen->port;
-		list_iterator_destroy(iter);
-	}
-
 	req_msg.msg_type = REQUEST_JOB_PACK_ALLOCATION;
 	req_msg.data     = job_req_list;
 
@@ -394,21 +414,20 @@ List slurm_allocate_pack_job_blocking(List job_req_list, time_t timeout,
 		/* Yay, the controller has acknowledged our request!
 		 * Test if we have an allocation yet? */
 		resp = (List) resp_msg.data;
-#if 0
-//FIXME: Update for a list
-		if (resp->node_cnt > 0) {
+		_pack_alloc_test(resp, &node_cnt, &job_id);
+		if (node_cnt > 0) {
 			/* yes, allocation has been granted */
 			errno = SLURM_PROTOCOL_SUCCESS;
-		} else if (!req->immediate) {
-			if (resp->error_code != SLURM_SUCCESS)
-				info("%s", slurm_strerror(resp->error_code));
+		} else if (immediate_flag) {
+			debug("Immediate allocation not granted");
+		} else {
 			/* no, we need to wait for a response */
-			job_id = resp->job_id;
-			slurm_free_resource_allocation_response_msg(resp);
+			FREE_NULL_LIST(resp);
 			if (pending_callback != NULL)
 				pending_callback(job_id);
- 			resp = _wait_for_allocation_response(job_id, listen,
-							     timeout);
+			_wait_for_allocation_response(job_id, listen,
+						RESPONSE_JOB_PACK_ALLOCATION,
+						timeout, (void **) &resp);
 			/* If NULL, we didn't get the allocation in
 			 * the time desired, so just free the job id */
 			if ((resp == NULL) && (errno != ESLURM_ALREADY_DONE)) {
@@ -416,7 +435,6 @@ List slurm_allocate_pack_job_blocking(List job_req_list, time_t timeout,
 				slurm_complete_job(job_id, -1);
 			}
 		}
-#endif
 		break;
 	default:
 		errnum = SLURM_UNEXPECTED_MSG_ERROR;
@@ -993,18 +1011,17 @@ static int _wait_for_alloc_rpc(const listen_t *listen, int sleep_time)
 	return 0;
 }
 
-static resource_allocation_response_msg_t *
-_wait_for_allocation_response(uint32_t job_id, const listen_t *listen,
-			      int timeout)
+static void _wait_for_allocation_response(uint32_t job_id,
+					  const listen_t *listen,
+					  uint16_t msg_type, int timeout,
+					  void **resp)
 {
-	resource_allocation_response_msg_t *resp = NULL;
 	int errnum, rc;
 
 	info("job %u queued and waiting for resources", job_id);
-	if ((rc = _wait_for_alloc_rpc(listen, timeout)) == 1) {
-		rc = _accept_msg_connection(listen->fd,
-				RESPONSE_RESOURCE_ALLOCATION, (void **) &resp);
-	}
+	*resp = NULL;
+	if ((rc = _wait_for_alloc_rpc(listen, timeout)) == 1)
+		rc = _accept_msg_connection(listen->fd, msg_type, resp);
 	if (rc <= 0) {
 		errnum = errno;
 		/* Maybe the resource allocation response RPC got lost
@@ -1012,19 +1029,20 @@ _wait_for_allocation_response(uint32_t job_id, const listen_t *listen,
 		 * Let's see if the controller thinks that the allocation
 		 * has been granted.
 		 */
-		if (slurm_allocation_lookup(job_id, &resp) >= 0) {
-			return resp;
-		}
+//FIXME
+//		if (slurm_allocation_lookup(job_id, resp) >= 0) {
+//			return resp;
+//		}
 		if (slurm_get_errno() == ESLURM_JOB_PENDING) {
 			debug3("Still waiting for allocation");
 			errno = errnum;
-			return NULL;
+			return;
 		} else {
 			debug3("Unable to confirm allocation for job %u: %m",
 			       job_id);
-			return NULL;
+			return;
 		}
 	}
 	info("job %u has been allocated resources", job_id);
-	return resp;
+	return;
 }
diff --git a/src/salloc/salloc.c b/src/salloc/salloc.c
index d79d3401b1f..2149031239e 100644
--- a/src/salloc/salloc.c
+++ b/src/salloc/salloc.c
@@ -413,18 +413,19 @@ int main(int argc, char **argv)
 		slurm_allocation_msg_thr_destroy(msg_thr);
 		exit(error_exit);
 	} else if (job_resp_list && !allocation_interrupted) {
+		/* Allocation granted to regular job */
 		ListIterator iter;
 		iter = list_iterator_create(job_resp_list);
 		while ((alloc = (resource_allocation_response_msg_t *)
 				list_next(iter))) {
 			info("Granted job allocation %u", alloc->job_id);
+info("  Nodes %s", alloc->node_list);
 		}
 		list_iterator_destroy(iter);
+//FIXME, print one job ID
 exit(1);
 	} else if (!allocation_interrupted) {
-		/*
-		 * Allocation granted!
-		 */
+		/* Allocation granted to regular job */
 		info("Granted job allocation %u", alloc->job_id);
 		pending_job_id = alloc->job_id;
 
diff --git a/src/slurmctld/proc_req.c b/src/slurmctld/proc_req.c
index be3e6e576ec..e4c05d56fc1 100644
--- a/src/slurmctld/proc_req.c
+++ b/src/slurmctld/proc_req.c
@@ -1153,6 +1153,9 @@ static void _slurm_rpc_allocate_pack(slurm_msg_t * msg)
 	hostset_t jobid_hostset = NULL;
 	char tmp_str[32];
 	List resp = NULL;
+	slurm_addr_t resp_addr;
+	char resp_host[16];
+	uint16_t port;	/* dummy value */
 
 	START_TIMER;
 
@@ -1167,6 +1170,14 @@ static void _slurm_rpc_allocate_pack(slurm_msg_t * msg)
 		error_code = SLURM_ERROR;
 		goto send_msg;
 	}
+	if (slurm_get_peer_addr(msg->conn_fd, &resp_addr) == 0) {
+		slurm_get_ip_str(&resp_addr, &port,resp_host,sizeof(resp_host));
+	} else {
+		info("REQUEST_JOB_PACK_ALLOCATION from uid=%d , can't get peer addr",
+		     uid);
+		error_code = SLURM_ERROR;
+		goto send_msg;
+	}
 
 	debug2("sched: Processing RPC: REQUEST_JOB_PACK_ALLOCATION from uid=%d",
 	       uid);
@@ -1225,6 +1236,8 @@ static void _slurm_rpc_allocate_pack(slurm_msg_t * msg)
 		job_ptr = NULL;
 		job_desc_msg->begin_time = MAX(job_desc_msg->begin_time,
 					       min_begin);
+		if (!job_desc_msg->resp_host)
+			job_desc_msg->resp_host = xstrdup(resp_host);
 		error_code = job_allocate(job_desc_msg, false, false, NULL,
 					  true, uid, &job_ptr, &err_msg,
 					  msg->protocol_version);
diff --git a/src/slurmctld/srun_comm.c b/src/slurmctld/srun_comm.c
index 9963c15c1c3..7e7fcb684ca 100644
--- a/src/slurmctld/srun_comm.c
+++ b/src/slurmctld/srun_comm.c
@@ -153,27 +153,48 @@ resource_allocation_response_msg_t *_build_alloc_msg(struct job_record *job_ptr)
 extern void srun_allocate (uint32_t job_id)
 {
 	struct job_record *job_ptr = find_job_record(job_id);
+	struct job_record *pack_job, *pack_leader;
+	resource_allocation_response_msg_t *msg_arg;
+	slurm_addr_t *addr;
+	ListIterator iter;
+	List job_resp_list = NULL;
 
 	xassert(job_ptr);
-	if (job_ptr && job_ptr->alloc_resp_port && job_ptr->alloc_node &&
-	    job_ptr->resp_host && job_ptr->job_resrcs &&
-	    job_ptr->job_resrcs->cpu_array_cnt) {
-		slurm_addr_t *addr;
-		resource_allocation_response_msg_t *msg_arg;
-
-		if (_pending_pack_jobs(job_ptr))
-			return;
+	if (!job_ptr || !job_ptr->alloc_resp_port || !job_ptr->alloc_node ||
+	    !job_ptr->resp_host || !job_ptr->job_resrcs ||
+	    !job_ptr->job_resrcs->cpu_array_cnt)
+		return;
 
+	if (job_ptr->pack_job_id == 0) {
 		addr = xmalloc(sizeof(struct sockaddr_in));
 		slurm_set_addr(addr, job_ptr->alloc_resp_port,
 			job_ptr->resp_host);
 		msg_arg = _build_alloc_msg(job_ptr);
 		set_remote_working_response(msg_arg, job_ptr,
 					    job_ptr->origin_cluster);
-
 		_srun_agent_launch(addr, job_ptr->alloc_node,
 				   RESPONSE_RESOURCE_ALLOCATION, msg_arg,
 				   job_ptr->start_protocol_ver);
+	} else if (_pending_pack_jobs(job_ptr)) {
+		return;
+	} else {
+		addr = xmalloc(sizeof(struct sockaddr_in));
+		pack_leader = find_job_record(job_ptr->pack_job_id);
+		slurm_set_addr(addr, pack_leader->alloc_resp_port,
+			       pack_leader->resp_host);
+//FIXME: Need destroy function
+		job_resp_list = list_create(NULL);
+		iter = list_iterator_create(pack_leader->pack_job_list);
+		while ((pack_job = (struct job_record *) list_next(iter))) {
+			msg_arg = _build_alloc_msg(pack_job);
+			set_remote_working_response(msg_arg, pack_job,
+						    pack_job->origin_cluster);
+			list_append(job_resp_list, msg_arg);
+		}
+		list_iterator_destroy(iter);
+		_srun_agent_launch(addr, job_ptr->alloc_node,
+				   RESPONSE_JOB_PACK_ALLOCATION, job_resp_list,
+				   job_ptr->start_protocol_ver);
 	}
 }
 
-- 
GitLab