From 116ff7decc92ae4711e94e519a3388ade9765788 Mon Sep 17 00:00:00 2001
From: Brian Christiansen <brian@schedmd.com>
Date: Thu, 6 Oct 2016 13:28:12 -0600
Subject: [PATCH] Thread -M<clusters> will_run calls

This is an example of how to do it. The problem is that select_jobinfo
on the job_desc is packed using working_cluster's->plugin_id.
job_desc's->select_jobinfo is only used by bluegene and alps code which
will eventually go away.
---
 slurm/slurm.h.in                 | 12 +++++
 src/api/allocate.c               | 45 +++++++++++++++++
 src/common/slurm_protocol_pack.c | 87 ++++++++++++++------------------
 src/common/slurmdb_defs.c        | 70 +++++++++++++++----------
 src/common/slurmdb_defs.h        |  7 ++-
 5 files changed, 143 insertions(+), 78 deletions(-)

diff --git a/slurm/slurm.h.in b/slurm/slurm.h.in
index 6d33508821d..2c085a03c35 100644
--- a/slurm/slurm.h.in
+++ b/slurm/slurm.h.in
@@ -3173,6 +3173,18 @@ extern int slurm_job_will_run(job_desc_msg_t *job_desc_msg);
 extern int slurm_job_will_run2(job_desc_msg_t *req,
 			       will_run_response_msg_t **will_run_resp);
 
+/*
+ * slurm_job_will_run2_addr - determine if a job would execute immediately if
+ * 	submitted now
+ * INT addr - prebuilt slurm_addr_t
+ * IN job_desc_msg - description of resource allocation request
+ * OUT will_run_resp - job run time data
+ * 	free using slurm_free_will_run_response_msg()
+ * RET 0 on success, otherwise return -1 and set errno to indicate the error
+ */
+int slurm_job_will_run2_addr(slurm_addr_t *addr, job_desc_msg_t *req,
+			     will_run_response_msg_t **will_run_resp);
+
 /*
  * slurm_sbcast_lookup - retrieve info for an existing resource allocation
  *	including a credential needed for sbcast
diff --git a/src/api/allocate.c b/src/api/allocate.c
index 4e49b958970..ad2105b150f 100644
--- a/src/api/allocate.c
+++ b/src/api/allocate.c
@@ -378,6 +378,51 @@ int slurm_job_will_run2 (job_desc_msg_t *req,
 	return SLURM_PROTOCOL_SUCCESS;
 }
 
+/*
+ * slurm_job_will_run2_addr - determine if a job would execute immediately if
+ * 	submitted now
+ * INT addr - prebuilt slurm_addr_t
+ * IN job_desc_msg - description of resource allocation request
+ * OUT will_run_resp - job run time data
+ * 	free using slurm_free_will_run_response_msg()
+ * RET 0 on success, otherwise return -1 and set errno to indicate the error
+ */
+int slurm_job_will_run2_addr(slurm_addr_t *addr, job_desc_msg_t *req,
+			     will_run_response_msg_t **will_run_resp)
+{
+	slurm_msg_t req_msg, resp_msg;
+	int rc;
+	/* req.immediate = true;    implicit */
+
+	slurm_msg_t_init(&req_msg);
+	req_msg.msg_type = REQUEST_JOB_WILL_RUN;
+	req_msg.data     = req;
+	int fd = -1;
+
+	if ((fd = slurm_open_msg_conn(addr)) < 0)
+		return SLURM_SOCKET_ERROR;
+
+	rc = slurm_send_recv_msg(fd, &req_msg, &resp_msg, 0);
+	slurm_close(fd);
+	if (rc < 0)
+		return SLURM_SOCKET_ERROR;
+
+	switch (resp_msg.msg_type) {
+	case RESPONSE_SLURM_RC:
+		if (_handle_rc_msg(&resp_msg) < 0)
+			return SLURM_PROTOCOL_ERROR;
+		break;
+	case RESPONSE_JOB_WILL_RUN:
+		*will_run_resp = (will_run_response_msg_t *) resp_msg.data;
+		break;
+	default:
+		slurm_seterrno_ret(SLURM_UNEXPECTED_MSG_ERROR);
+		break;
+	}
+
+	return SLURM_PROTOCOL_SUCCESS;
+}
+
 /*
  * slurm_job_step_create - create a job step for a given job id
  * IN slurm_step_alloc_req_msg - description of job step request
diff --git a/src/common/slurm_protocol_pack.c b/src/common/slurm_protocol_pack.c
index b0425730418..33a75ac4f36 100644
--- a/src/common/slurm_protocol_pack.c
+++ b/src/common/slurm_protocol_pack.c
@@ -8880,56 +8880,53 @@ _pack_job_desc_msg(job_desc_msg_t * job_desc_ptr, Buf buffer,
 				job_desc_ptr->select_jobinfo,
 				buffer, protocol_version);
 		} else {
-			job_desc_ptr->select_jobinfo =
-				select_g_select_jobinfo_alloc();
+			dynamic_plugin_data_t *select_jobinfo;
+			select_jobinfo = select_g_select_jobinfo_alloc();
 			if (job_desc_ptr->geometry[0] != (uint16_t) NO_VAL)
 				select_g_select_jobinfo_set(
-					job_desc_ptr->select_jobinfo,
+					select_jobinfo,
 					SELECT_JOBDATA_GEOMETRY,
 					job_desc_ptr->geometry);
 
 			if (job_desc_ptr->conn_type[0] != (uint16_t) NO_VAL)
 				select_g_select_jobinfo_set(
-					job_desc_ptr->select_jobinfo,
+					select_jobinfo,
 					SELECT_JOBDATA_CONN_TYPE,
 					&(job_desc_ptr->conn_type));
 			if (job_desc_ptr->reboot != (uint16_t) NO_VAL)
 				select_g_select_jobinfo_set(
-					job_desc_ptr->select_jobinfo,
+					select_jobinfo,
 					SELECT_JOBDATA_REBOOT,
 					&(job_desc_ptr->reboot));
 			if (job_desc_ptr->rotate != (uint16_t) NO_VAL)
 				select_g_select_jobinfo_set(
-					job_desc_ptr->select_jobinfo,
+					select_jobinfo,
 					SELECT_JOBDATA_ROTATE,
 					&(job_desc_ptr->rotate));
 			if (job_desc_ptr->blrtsimage) {
 				select_g_select_jobinfo_set(
-					job_desc_ptr->select_jobinfo,
+					select_jobinfo,
 					SELECT_JOBDATA_BLRTS_IMAGE,
 					job_desc_ptr->blrtsimage);
 			}
 			if (job_desc_ptr->linuximage)
 				select_g_select_jobinfo_set(
-					job_desc_ptr->select_jobinfo,
+					select_jobinfo,
 					SELECT_JOBDATA_LINUX_IMAGE,
 					job_desc_ptr->linuximage);
 			if (job_desc_ptr->mloaderimage)
 				select_g_select_jobinfo_set(
-					job_desc_ptr->select_jobinfo,
+					select_jobinfo,
 					SELECT_JOBDATA_MLOADER_IMAGE,
 					job_desc_ptr->mloaderimage);
 			if (job_desc_ptr->ramdiskimage)
 				select_g_select_jobinfo_set(
-					job_desc_ptr->select_jobinfo,
+					select_jobinfo,
 					SELECT_JOBDATA_RAMDISK_IMAGE,
 					job_desc_ptr->ramdiskimage);
-			select_g_select_jobinfo_pack(
-				job_desc_ptr->select_jobinfo,
-				buffer, protocol_version);
-			select_g_select_jobinfo_free(
-				job_desc_ptr->select_jobinfo);
-			job_desc_ptr->select_jobinfo = NULL;
+			select_g_select_jobinfo_pack(select_jobinfo, buffer,
+						     protocol_version);
+			select_g_select_jobinfo_free(select_jobinfo);
 		}
 		pack16(job_desc_ptr->wait_all_nodes, buffer);
 		pack32(job_desc_ptr->bitflags, buffer);
@@ -9049,56 +9046,53 @@ _pack_job_desc_msg(job_desc_msg_t * job_desc_ptr, Buf buffer,
 				job_desc_ptr->select_jobinfo,
 				buffer, protocol_version);
 		} else {
-			job_desc_ptr->select_jobinfo =
-				select_g_select_jobinfo_alloc();
+			dynamic_plugin_data_t *select_jobinfo;
+			select_jobinfo = select_g_select_jobinfo_alloc();
 			if (job_desc_ptr->geometry[0] != (uint16_t) NO_VAL)
 				select_g_select_jobinfo_set(
-					job_desc_ptr->select_jobinfo,
+					select_jobinfo,
 					SELECT_JOBDATA_GEOMETRY,
 					job_desc_ptr->geometry);
 
 			if (job_desc_ptr->conn_type[0] != (uint16_t) NO_VAL)
 				select_g_select_jobinfo_set(
-					job_desc_ptr->select_jobinfo,
+					select_jobinfo,
 					SELECT_JOBDATA_CONN_TYPE,
 					&(job_desc_ptr->conn_type));
 			if (job_desc_ptr->reboot != (uint16_t) NO_VAL)
 				select_g_select_jobinfo_set(
-					job_desc_ptr->select_jobinfo,
+					select_jobinfo,
 					SELECT_JOBDATA_REBOOT,
 					&(job_desc_ptr->reboot));
 			if (job_desc_ptr->rotate != (uint16_t) NO_VAL)
 				select_g_select_jobinfo_set(
-					job_desc_ptr->select_jobinfo,
+					select_jobinfo,
 					SELECT_JOBDATA_ROTATE,
 					&(job_desc_ptr->rotate));
 			if (job_desc_ptr->blrtsimage) {
 				select_g_select_jobinfo_set(
-					job_desc_ptr->select_jobinfo,
+					select_jobinfo,
 					SELECT_JOBDATA_BLRTS_IMAGE,
 					job_desc_ptr->blrtsimage);
 			}
 			if (job_desc_ptr->linuximage)
 				select_g_select_jobinfo_set(
-					job_desc_ptr->select_jobinfo,
+					select_jobinfo,
 					SELECT_JOBDATA_LINUX_IMAGE,
 					job_desc_ptr->linuximage);
 			if (job_desc_ptr->mloaderimage)
 				select_g_select_jobinfo_set(
-					job_desc_ptr->select_jobinfo,
+					select_jobinfo,
 					SELECT_JOBDATA_MLOADER_IMAGE,
 					job_desc_ptr->mloaderimage);
 			if (job_desc_ptr->ramdiskimage)
 				select_g_select_jobinfo_set(
-					job_desc_ptr->select_jobinfo,
+					select_jobinfo,
 					SELECT_JOBDATA_RAMDISK_IMAGE,
 					job_desc_ptr->ramdiskimage);
-			select_g_select_jobinfo_pack(
-				job_desc_ptr->select_jobinfo,
-				buffer, protocol_version);
-			select_g_select_jobinfo_free(
-				job_desc_ptr->select_jobinfo);
-			job_desc_ptr->select_jobinfo = NULL;
+			select_g_select_jobinfo_pack(select_jobinfo, buffer,
+						     protocol_version);
+			select_g_select_jobinfo_free( select_jobinfo);
 		}
 		pack16(job_desc_ptr->wait_all_nodes, buffer);
 		pack32(job_desc_ptr->bitflags, buffer);
@@ -9210,56 +9204,53 @@ _pack_job_desc_msg(job_desc_msg_t * job_desc_ptr, Buf buffer,
 				job_desc_ptr->select_jobinfo,
 				buffer, protocol_version);
 		} else {
-			job_desc_ptr->select_jobinfo =
-				select_g_select_jobinfo_alloc();
+			dynamic_plugin_data_t *select_jobinfo;
+			select_jobinfo = select_g_select_jobinfo_alloc();
 			if (job_desc_ptr->geometry[0] != (uint16_t) NO_VAL)
 				select_g_select_jobinfo_set(
-					job_desc_ptr->select_jobinfo,
+					select_jobinfo,
 					SELECT_JOBDATA_GEOMETRY,
 					job_desc_ptr->geometry);
 
 			if (job_desc_ptr->conn_type[0] != (uint16_t) NO_VAL)
 				select_g_select_jobinfo_set(
-					job_desc_ptr->select_jobinfo,
+					select_jobinfo,
 					SELECT_JOBDATA_CONN_TYPE,
 					&(job_desc_ptr->conn_type));
 			if (job_desc_ptr->reboot != (uint16_t) NO_VAL)
 				select_g_select_jobinfo_set(
-					job_desc_ptr->select_jobinfo,
+					select_jobinfo,
 					SELECT_JOBDATA_REBOOT,
 					&(job_desc_ptr->reboot));
 			if (job_desc_ptr->rotate != (uint16_t) NO_VAL)
 				select_g_select_jobinfo_set(
-					job_desc_ptr->select_jobinfo,
+					select_jobinfo,
 					SELECT_JOBDATA_ROTATE,
 					&(job_desc_ptr->rotate));
 			if (job_desc_ptr->blrtsimage) {
 				select_g_select_jobinfo_set(
-					job_desc_ptr->select_jobinfo,
+					select_jobinfo,
 					SELECT_JOBDATA_BLRTS_IMAGE,
 					job_desc_ptr->blrtsimage);
 			}
 			if (job_desc_ptr->linuximage)
 				select_g_select_jobinfo_set(
-					job_desc_ptr->select_jobinfo,
+					select_jobinfo,
 					SELECT_JOBDATA_LINUX_IMAGE,
 					job_desc_ptr->linuximage);
 			if (job_desc_ptr->mloaderimage)
 				select_g_select_jobinfo_set(
-					job_desc_ptr->select_jobinfo,
+					select_jobinfo,
 					SELECT_JOBDATA_MLOADER_IMAGE,
 					job_desc_ptr->mloaderimage);
 			if (job_desc_ptr->ramdiskimage)
 				select_g_select_jobinfo_set(
-					job_desc_ptr->select_jobinfo,
+					select_jobinfo,
 					SELECT_JOBDATA_RAMDISK_IMAGE,
 					job_desc_ptr->ramdiskimage);
-			select_g_select_jobinfo_pack(
-				job_desc_ptr->select_jobinfo,
-				buffer, protocol_version);
-			select_g_select_jobinfo_free(
-				job_desc_ptr->select_jobinfo);
-			job_desc_ptr->select_jobinfo = NULL;
+			select_g_select_jobinfo_pack(select_jobinfo, buffer,
+						     protocol_version);
+			select_g_select_jobinfo_free( select_jobinfo);
 		}
 		pack16(job_desc_ptr->wait_all_nodes, buffer);
 		pack32(job_desc_ptr->bitflags, buffer);
diff --git a/src/common/slurmdb_defs.c b/src/common/slurmdb_defs.c
index 06e475d421a..ad8e38c2421 100644
--- a/src/common/slurmdb_defs.c
+++ b/src/common/slurmdb_defs.c
@@ -411,29 +411,28 @@ static int _sort_local_cluster(void *v1, void *v2)
 	return 0;
 }
 
-static local_cluster_rec_t * _job_will_run (job_desc_msg_t *req)
+static void * _job_will_run (void *arg)
 {
-	local_cluster_rec_t *local_cluster = NULL;
+	local_cluster_rec_t *local_cluster = (local_cluster_rec_t *)arg;
+	slurmdb_cluster_rec_t *cluster_rec = local_cluster->cluster_rec;
 	will_run_response_msg_t *will_run_resp;
 	char buf[64];
 	int rc;
 	uint32_t cluster_flags = slurmdb_setup_cluster_flags();
 	char *type = "processors";
 
-	rc = slurm_job_will_run2(req, &will_run_resp);
-
-	if (rc >= 0) {
+	if (!(rc = slurm_job_will_run2_addr(&cluster_rec->control_addr,
+					    local_cluster->job_desc,
+					    &will_run_resp))) {
 		if (cluster_flags & CLUSTER_FLAG_BG)
 			type = "cnodes";
 		slurm_make_time_str(&will_run_resp->start_time,
 				    buf, sizeof(buf));
 		debug("Job %u to start at %s on cluster %s using %u %s on %s",
-		      will_run_resp->job_id, buf, working_cluster_rec->name,
+		      will_run_resp->job_id, buf, cluster_rec->name,
 		      will_run_resp->proc_cnt, type,
 		      will_run_resp->node_list);
 
-		local_cluster = xmalloc(sizeof(local_cluster_rec_t));
-		local_cluster->cluster_rec = working_cluster_rec;
 		local_cluster->start_time = will_run_resp->start_time;
 
 		if (will_run_resp->preemptee_job_id) {
@@ -458,7 +457,9 @@ static local_cluster_rec_t * _job_will_run (job_desc_msg_t *req)
 		slurm_free_will_run_response_msg(will_run_resp);
 	}
 
-	return local_cluster;
+	local_cluster->thread_rc = rc;
+
+	return NULL;
 }
 
 static int _set_qos_bit_from_string(bitstr_t *valid_qos, char *name)
@@ -3012,13 +3013,15 @@ extern int slurmdb_get_first_avail_cluster(job_desc_msg_t *req,
 	char *cluster_names, slurmdb_cluster_rec_t **cluster_rec)
 {
 	local_cluster_rec_t *local_cluster = NULL;
+	slurmdb_cluster_rec_t *tmp_cluster;
 	int rc = SLURM_SUCCESS;
 	char buf[64];
 	bool host_set = false;
-	ListIterator itr;
+	ListIterator itr, resp_itr;
 	List cluster_list = NULL;
 	List ret_list = NULL;
 	List tried_feds = list_create(NULL);
+	pthread_attr_t attr;
 
 	*cluster_rec = NULL;
 	cluster_list = slurmdb_get_info_cluster(cluster_names);
@@ -3038,37 +3041,48 @@ extern int slurmdb_get_first_avail_cluster(job_desc_msg_t *req,
 		host_set = true;
 	}
 
-	if (working_cluster_rec)
-		*cluster_rec = working_cluster_rec;
-
+	slurm_attr_init(&attr);
 	ret_list = list_create(_destroy_local_cluster_rec);
 	itr = list_iterator_create(cluster_list);
-	while ((working_cluster_rec = list_next(itr))) {
+	while ((tmp_cluster = list_next(itr))) {
 
 		/* only try one cluster from each federation */
-		if (working_cluster_rec->fed.name &&
+		if (tmp_cluster->fed.id &&
 		    list_find_first(tried_feds, _find_char_in_list,
-				    working_cluster_rec->fed.name))
+				    tmp_cluster->fed.name))
 			continue;
 
-		if ((local_cluster = _job_will_run(req))) {
-			list_append(ret_list, local_cluster);
-			if (working_cluster_rec->fed.name)
-				list_append(tried_feds,
-					    working_cluster_rec->fed.name);
-		} else {
-			error("Problem with submit to cluster %s: %m",
-			      working_cluster_rec->name);
+		local_cluster_rec_t *local_cluster =
+			xmalloc(sizeof(local_cluster_rec_t));
+		local_cluster->cluster_rec = tmp_cluster;
+		local_cluster->job_desc    = req;
+
+		if (pthread_create(&local_cluster->thread_id, &attr,
+				   _job_will_run, local_cluster) != 0) {
+			error("failed to create _job_will_run thread");
+			xfree(local_cluster);
+			continue;
 		}
+
+		list_append(ret_list, local_cluster);
+		if (tmp_cluster->fed.id)
+			list_append(tried_feds, tmp_cluster->fed.name);
 	}
 	list_iterator_destroy(itr);
 	FREE_NULL_LIST(tried_feds);
+	slurm_attr_destroy(&attr);
 
-	/* restore working_cluster_rec in case it was already set */
-	if (*cluster_rec) {
-		working_cluster_rec = *cluster_rec;
-		*cluster_rec = NULL;
+	resp_itr = list_iterator_create(ret_list);
+	while ((local_cluster = list_next(resp_itr))) {
+		pthread_join(local_cluster->thread_id, NULL);
+
+		if (local_cluster->thread_rc) {
+			error("Problem with submit to cluster %s: %m",
+			      local_cluster->cluster_rec->name);
+			list_delete_item(resp_itr);
+		}
 	}
+	list_iterator_destroy(resp_itr);
 
 	if (host_set)
 		req->alloc_node = NULL;
diff --git a/src/common/slurmdb_defs.h b/src/common/slurmdb_defs.h
index 3305779da29..5d939198ca7 100644
--- a/src/common/slurmdb_defs.h
+++ b/src/common/slurmdb_defs.h
@@ -96,8 +96,11 @@ typedef enum {
 
 typedef struct {
 	slurmdb_cluster_rec_t *cluster_rec;
-	int preempt_cnt;
-	time_t start_time;
+	job_desc_msg_t        *job_desc;
+	int                    preempt_cnt;
+	time_t                 start_time;
+	pthread_t              thread_id;
+	int                    thread_rc;
 } local_cluster_rec_t;
 
 extern slurmdb_job_rec_t *slurmdb_create_job_rec();
-- 
GitLab