From 6a55becce1200dbd3270585d8d9de4bf5bd7cd66 Mon Sep 17 00:00:00 2001
From: Danny Auble <da@schedmd.com>
Date: Fri, 10 Jul 2015 10:57:52 -0700
Subject: [PATCH] Add step complete to message aggregation.  This does slow
 things down a little adding the extra hops, but if you are running a bunch of
 different small steps it can help out a lot on messages going to the
 controller.

---
 src/common/slurm_protocol_defs.c |  3 ++
 src/common/slurm_protocol_defs.h |  1 +
 src/common/slurm_protocol_pack.c |  2 ++
 src/slurmctld/proc_req.c         | 34 +++++++++++++-------
 src/slurmd/slurmd/req.c          | 54 ++++++++++++++++++++++++++++++++
 src/slurmd/slurmstepd/mgr.c      | 22 ++++++++++++-
 6 files changed, 104 insertions(+), 12 deletions(-)

diff --git a/src/common/slurm_protocol_defs.c b/src/common/slurm_protocol_defs.c
index 58e0ead5915..8bca807c071 100644
--- a/src/common/slurm_protocol_defs.c
+++ b/src/common/slurm_protocol_defs.c
@@ -3361,6 +3361,7 @@ extern int slurm_free_msg_data(slurm_msg_type_t type, void *data)
 		slurm_free_block_info_request_msg(data);
 		break;
 	case REQUEST_STEP_COMPLETE:
+	case REQUEST_STEP_COMPLETE_AGGR:
 		slurm_free_step_complete_msg(data);
 		break;
 	case RESPONSE_JOB_STEP_STAT:
@@ -3813,6 +3814,8 @@ rpc_num2string(uint16_t opcode)
 		return "RESPONSE_SUSPEND";
 	case REQUEST_STEP_COMPLETE:
 		return "REQUEST_STEP_COMPLETE";
+	case REQUEST_STEP_COMPLETE_AGGR:
+		return "REQUEST_STEP_COMPLETE_AGGR";
 	case REQUEST_COMPLETE_JOB_ALLOCATION:
 		return "REQUEST_COMPLETE_JOB_ALLOCATION";
 	case REQUEST_COMPLETE_BATCH_SCRIPT:
diff --git a/src/common/slurm_protocol_defs.h b/src/common/slurm_protocol_defs.h
index de3429cca4e..4f4fa2f3326 100644
--- a/src/common/slurm_protocol_defs.h
+++ b/src/common/slurm_protocol_defs.h
@@ -326,6 +326,7 @@ typedef enum {
 	RESPONSE_JOB_ARRAY_ERRORS,
 	REQUEST_NETWORK_CALLERID,
 	RESPONSE_NETWORK_CALLERID,
+	REQUEST_STEP_COMPLETE_AGGR,
 
 	REQUEST_LAUNCH_TASKS = 6001,
 	RESPONSE_LAUNCH_TASKS,
diff --git a/src/common/slurm_protocol_pack.c b/src/common/slurm_protocol_pack.c
index 405a39a9ea7..40996b394fe 100644
--- a/src/common/slurm_protocol_pack.c
+++ b/src/common/slurm_protocol_pack.c
@@ -1266,6 +1266,7 @@ pack_msg(slurm_msg_t const *msg, Buf buffer)
 			msg->protocol_version);
 		break;
 	case REQUEST_STEP_COMPLETE:
+	case REQUEST_STEP_COMPLETE_AGGR:
 		_pack_step_complete_msg((step_complete_msg_t *)msg->data,
 					buffer,
 					msg->protocol_version);
@@ -1930,6 +1931,7 @@ unpack_msg(slurm_msg_t * msg, Buf buffer)
 			msg->protocol_version);
 		break;
 	case REQUEST_STEP_COMPLETE:
+	case REQUEST_STEP_COMPLETE_AGGR:
 		rc = _unpack_step_complete_msg((step_complete_msg_t
 						**) & (msg->data),
 					       buffer,
diff --git a/src/slurmctld/proc_req.c b/src/slurmctld/proc_req.c
index c9c7e833c2b..c5f0d0005b7 100644
--- a/src/slurmctld/proc_req.c
+++ b/src/slurmctld/proc_req.c
@@ -183,7 +183,7 @@ inline static void  _slurm_rpc_set_schedlog_level(slurm_msg_t *msg);
 inline static void  _slurm_rpc_shutdown_controller(slurm_msg_t * msg);
 inline static void  _slurm_rpc_shutdown_controller_immediate(slurm_msg_t *
 							     msg);
-inline static void  _slurm_rpc_step_complete(slurm_msg_t * msg);
+inline static void  _slurm_rpc_step_complete(slurm_msg_t * msg, bool locked);
 inline static void  _slurm_rpc_step_layout(slurm_msg_t * msg);
 inline static void  _slurm_rpc_step_update(slurm_msg_t * msg);
 inline static void  _slurm_rpc_submit_batch_job(slurm_msg_t * msg);
@@ -491,7 +491,7 @@ void slurmctld_req(slurm_msg_t *msg, connection_arg_t *arg)
 		/* No body to free */
 		break;
 	case REQUEST_STEP_COMPLETE:
-		_slurm_rpc_step_complete(msg);
+		_slurm_rpc_step_complete(msg, 0);
 		slurm_free_step_complete_msg(msg->data);
 		break;
 	case REQUEST_STEP_LAYOUT:
@@ -3089,7 +3089,7 @@ static void _slurm_rpc_shutdown_controller_immediate(slurm_msg_t * msg)
  *      completion of a job step on at least some nodes.
  *	If the job step is complete, it may
  *	represent the termination of an entire job */
-static void _slurm_rpc_step_complete(slurm_msg_t *msg)
+static void _slurm_rpc_step_complete(slurm_msg_t *msg, bool locked)
 {
 	static int active_rpc_cnt = 0;
 	int error_code = SLURM_SUCCESS, rc, rem;
@@ -3110,14 +3110,19 @@ static void _slurm_rpc_step_complete(slurm_msg_t *msg)
 		     req->job_id, req->job_step_id, req->range_first,
 		     req->range_last, req->step_rc, uid);
 
-	_throttle_start(&active_rpc_cnt);
-	lock_slurmctld(job_write_lock);
+	if (!locked) {
+		_throttle_start(&active_rpc_cnt);
+		lock_slurmctld(job_write_lock);
+	}
+
 	rc = step_partial_comp(req, uid, &rem, &step_rc);
 
 	if (rc || rem) {	/* some error or not totally done */
 		/* Note: Error printed within step_partial_comp */
-		unlock_slurmctld(job_write_lock);
-		_throttle_fini(&active_rpc_cnt);
+		if (!locked) {
+			unlock_slurmctld(job_write_lock);
+			_throttle_fini(&active_rpc_cnt);
+		}
 		slurm_send_rc_msg(msg, rc);
 		if (!rc)	/* partition completion */
 			schedule_job_save();	/* Has own locking */
@@ -3128,8 +3133,10 @@ static void _slurm_rpc_step_complete(slurm_msg_t *msg)
 		/* FIXME: test for error, possibly cause batch job requeue */
 		error_code = job_complete(req->job_id, uid, false,
 					  false, step_rc);
-		unlock_slurmctld(job_write_lock);
-		_throttle_fini(&active_rpc_cnt);
+		if (!locked) {
+			unlock_slurmctld(job_write_lock);
+			_throttle_fini(&active_rpc_cnt);
+		}
 		END_TIMER2("_slurm_rpc_step_complete");
 
 		/* return result */
@@ -3148,8 +3155,10 @@ static void _slurm_rpc_step_complete(slurm_msg_t *msg)
 	} else {
 		error_code = job_step_complete(req->job_id, req->job_step_id,
 					       uid, false, step_rc);
-		unlock_slurmctld(job_write_lock);
-		_throttle_fini(&active_rpc_cnt);
+		if (!locked) {
+			unlock_slurmctld(job_write_lock);
+			_throttle_fini(&active_rpc_cnt);
+		}
 		END_TIMER2("_slurm_rpc_step_complete");
 
 		/* return result */
@@ -5720,6 +5729,9 @@ static void  _slurm_rpc_comp_msg_list(composite_msg_t * comp_msg,
 		case REQUEST_COMPLETE_BATCH_JOB:
 			_slurm_rpc_complete_batch_script(next_msg, 1);
 			break;
+		case REQUEST_STEP_COMPLETE:
+			_slurm_rpc_step_complete(next_msg, 1);
+			break;
 		case MESSAGE_EPILOG_COMPLETE:
 			_slurm_rpc_epilog_complete(next_msg, run_scheduler, 1);
 			break;
diff --git a/src/slurmd/slurmd/req.c b/src/slurmd/slurmd/req.c
index b34686750d6..f1bea1d9488 100644
--- a/src/slurmd/slurmd/req.c
+++ b/src/slurmd/slurmd/req.c
@@ -188,6 +188,7 @@ static int  _rpc_health_check(slurm_msg_t *);
 static int  _rpc_acct_gather_update(slurm_msg_t *);
 static int  _rpc_acct_gather_energy(slurm_msg_t *);
 static int  _rpc_step_complete(slurm_msg_t *msg);
+static int  _rpc_step_complete_aggr(slurm_msg_t *msg);
 static int  _rpc_stat_jobacct(slurm_msg_t *msg);
 static int  _rpc_list_pids(slurm_msg_t *msg);
 static int  _rpc_daemon_status(slurm_msg_t *msg);
@@ -431,6 +432,10 @@ slurmd_req(slurm_msg_t *msg)
 		(void) _rpc_step_complete(msg);
 		slurm_free_step_complete_msg(msg->data);
 		break;
+	case REQUEST_STEP_COMPLETE_AGGR:
+		(void) _rpc_step_complete_aggr(msg);
+		slurm_free_step_complete_msg(msg->data);
+		break;
 	case REQUEST_JOB_STEP_STAT:
 		(void) _rpc_stat_jobacct(msg);
 		slurm_free_job_step_id_msg(msg->data);
@@ -2849,6 +2854,55 @@ done:
 	return rc;
 }
 
+static void _setup_step_complete_msg(slurm_msg_t *msg, void *data)
+{
+	slurm_msg_t_init(msg);
+	msg->msg_type = REQUEST_STEP_COMPLETE;
+	msg->data = data;
+}
+
+/* This step_complete RPC came from slurmstepd because we are using
+ * message aggregation configured and we are at the head of the tree.
+ * This just adds the message to the list and goes on it's merry way. */
+static int
+_rpc_step_complete_aggr(slurm_msg_t *msg)
+{
+	int rc;
+	uid_t uid = g_slurm_auth_get_uid(msg->auth_cred, NULL);
+
+	if (!_slurm_authorized_user(uid)) {
+		error("Security violation: step_complete_aggr from uid %d",
+		      uid);
+		if (msg->conn_fd >= 0)
+			slurm_send_rc_msg(msg, ESLURM_USER_ID_MISSING);
+		return SLURM_ERROR;
+	}
+
+	if (conf->msg_aggr_window_msgs > 1) {
+		slurm_msg_t *req = xmalloc_nz(sizeof(slurm_msg_t));
+		_setup_step_complete_msg(req, msg->data);
+		msg->data = NULL;
+
+		msg_aggr_add_msg(req, 1, NULL);
+	} else {
+		slurm_msg_t req;
+		_setup_step_complete_msg(&req, msg->data);
+
+		while (slurm_send_recv_controller_rc_msg(&req, &rc) < 0) {
+			error("Unable to send step complete, "
+			      "trying again in a minute: %m");
+		}
+	}
+
+	/* Finish communication with the stepd, we have to wait for
+	 * the message back from the slurmctld or we will cause a race
+	 * condition with srun.
+	 */
+	slurm_send_rc_msg(msg, SLURM_SUCCESS);
+
+	return SLURM_SUCCESS;
+}
+
 /* Get list of active jobs and steps, xfree returned value */
 static char *
 _get_step_list(void)
diff --git a/src/slurmd/slurmstepd/mgr.c b/src/slurmd/slurmstepd/mgr.c
index c03b34914a8..a6ff2ff0665 100644
--- a/src/slurmd/slurmstepd/mgr.c
+++ b/src/slurmd/slurmstepd/mgr.c
@@ -850,7 +850,27 @@ _one_step_complete_msg(stepd_step_rec_t *job, int first, int last)
 		/* on error AGAIN, send to the slurmctld instead */
 		debug3("Rank %d sending complete to slurmctld instead, range "
 		       "%d to %d", step_complete.rank, first, last);
-	} else {
+	} else if (conf->msg_aggr_window_msgs > 1) {
+		/* this is the base of the tree, its parent is slurmctld */
+		debug3("Rank %d sending complete to slurmd for message aggr, "
+		       "range %d to %d",
+		       step_complete.rank, first, last);
+		/* this is the base of the tree, but we are doing
+		 * message aggr so send it to the slurmd to handle */
+		req.msg_type = REQUEST_STEP_COMPLETE_AGGR;
+		slurm_set_addr_char(&req.address, conf->port, "localhost");
+		for (i = 0; i <= REVERSE_TREE_PARENT_RETRY; i++) {
+			if (i)
+				sleep(1);
+			retcode = slurm_send_recv_rc_msg_only_one(&req, &rc, 0);
+			if ((retcode == 0) && (rc == 0))
+				goto finished;
+		}
+		req.msg_type = REQUEST_STEP_COMPLETE;
+		/* this is the base of the tree, its parent is slurmctld */
+		debug3("Rank %d sending complete to slurmctld instead, range "
+		       "%d to %d", step_complete.rank, first, last);
+	}  else {
 		/* this is the base of the tree, its parent is slurmctld */
 		debug3("Rank %d sending complete to slurmctld, range %d to %d",
 		       step_complete.rank, first, last);
-- 
GitLab