From bdc85810114ddb03031f8f4294f932dbcb16c0ed Mon Sep 17 00:00:00 2001
From: Moe Jette <jette1@llnl.gov>
Date: Thu, 21 Feb 2008 20:47:07 +0000
Subject: [PATCH] svn merge -r13308:13326
 https://eris.llnl.gov/svn/slurm/branches/slurm-1.2

---
 NEWS                               |  4 ++
 src/slurmd/slurmd/req.c            | 73 ++++++++++++++++++++++++++++--
 src/slurmd/slurmstepd/mgr.c        | 13 ++++--
 src/slurmd/slurmstepd/req.c        |  7 +++
 src/slurmd/slurmstepd/slurmstepd.h |  1 +
 5 files changed, 92 insertions(+), 6 deletions(-)

diff --git a/NEWS b/NEWS
index 87e6d8fbf76..befe049052f 100644
--- a/NEWS
+++ b/NEWS
@@ -166,6 +166,10 @@ documents those changes that are of interest to users and admins.
 =========================
  -- In sched/wiki and sched/wiki2, support non-zero UPDATE_TIME specification
     for GETNODES and GETJOBS commands.
+ -- Bug fix for sending accounting information multiple times for same 
+    info.  patch from Hongjia Cao (NUDT).
+ -- Spread out in time the EPILOG_COMPLETE messages from slurmd to slurmctld
+    to avoid message congestions and retransmission.
 
 * Changes in SLURM 1.2.23
 =========================
diff --git a/src/slurmd/slurmd/req.c b/src/slurmd/slurmd/req.c
index 1bd7bae75f7..b5c5a7daa34 100644
--- a/src/slurmd/slurmd/req.c
+++ b/src/slurmd/slurmd/req.c
@@ -102,6 +102,7 @@ typedef struct {
 static int  _abort_job(uint32_t job_id);
 static int  _abort_step(uint32_t job_id, uint32_t step_id);
 static char ** _build_env(uint32_t jobid, uid_t uid, char *bg_part_id);
+static void _delay_rpc(int host_inx, int host_cnt, int usec_per_rpc);
 static void _destroy_env(char **env);
 static bool _slurm_authorized_user(uid_t uid);
 static void _job_limits_free(void *x);
@@ -134,6 +135,7 @@ static int  _run_epilog(uint32_t jobid, uid_t uid, char *bg_part_id);
 
 static bool _pause_for_job_completion(uint32_t jobid, char *nodes, 
 		int maxtime);
+static void _sync_messages_kill(kill_job_msg_t *req);
 static int _waiter_init (uint32_t jobid);
 static int _waiter_complete (uint32_t jobid);
 
@@ -2122,8 +2124,6 @@ _epilog_complete(uint32_t jobid, int rc)
 
 	slurm_msg_t_init(&msg);
 	
-	_wait_state_completed(jobid, 5);
-
 	req.job_id      = jobid;
 	req.return_code = rc;
 	req.node_name   = conf->node_name;
@@ -2489,8 +2489,75 @@ _rpc_terminate_job(slurm_msg_t *msg)
 		debug("completed epilog for jobid %u", req->job_id);
 	
     done:
-	_epilog_complete(req->job_id, rc);
+	_wait_state_completed(req->job_id, 5);
 	_waiter_complete(req->job_id);
+	_sync_messages_kill(req);
+	_epilog_complete(req->job_id, rc);
+}
+
+/* On a parallel job, every slurmd may send the EPILOG_COMPLETE
+ * message to the slurmctld at the same time, resulting in lost
+ * messages. We add a delay here to spead out the message traffic
+ * assuming synchronized clocks across the cluster. 
+ * Allow 10 msec processing time in slurmctld for each RPC. */
+static void _sync_messages_kill(kill_job_msg_t *req)
+{
+	int host_cnt, host_inx;
+	char *host;
+	hostset_t hosts;
+
+	hosts = hostset_create(req->nodes);
+	host_cnt = hostset_count(hosts);
+	if (host_cnt <= 32)
+		goto fini;
+	if (conf->hostname == NULL)
+		goto fini;	/* should never happen */
+
+	for (host_inx=0; host_inx<host_cnt; host_inx++) {
+		host = hostset_shift(hosts);
+		if (host == NULL)
+			break;
+		if (strcmp(host, conf->node_name) == 0) {
+			free(host);
+			break;
+		}
+		free(host);
+	}
+	_delay_rpc(host_inx, host_cnt, 10000);
+
+ fini:	hostset_destroy(hosts);
+}
+
+/* Delay a message based upon the host index, total host count and RPC_TIME. 
+ * This logic depends upon synchronized clocks across the cluster. */
+static void _delay_rpc(int host_inx, int host_cnt, int usec_per_rpc)
+{
+	struct timeval tv1;
+	uint32_t cur_time;	/* current time in usec (just 9 digits) */
+	uint32_t tot_time;	/* total time expected for all RPCs */
+	uint32_t offset_time;	/* relative time within tot_time */
+	uint32_t target_time;	/* desired time to issue the RPC */
+	uint32_t delta_time;
+
+again:	if (gettimeofday(&tv1, NULL)) {
+		usleep(host_inx * usec_per_rpc);
+		return;
+	}
+
+	cur_time = (tv1.tv_sec % 1000) + tv1.tv_usec;
+	tot_time = host_cnt * usec_per_rpc;
+	offset_time = cur_time % tot_time;
+	target_time = host_inx * usec_per_rpc;
+	if (target_time < offset_time)
+		delta_time = target_time - offset_time + tot_time;
+	else
+		delta_time = target_time - offset_time;
+	if (usleep(delta_time)) {
+		if (errno == EINVAL) /* usleep for more than 1 sec */
+			usleep(900000);
+		/* errno == EINTR */
+		goto again;
+	}
 }
 
 /*
diff --git a/src/slurmd/slurmstepd/mgr.c b/src/slurmd/slurmstepd/mgr.c
index dcc4a205824..22405f1984f 100644
--- a/src/slurmd/slurmstepd/mgr.c
+++ b/src/slurmd/slurmstepd/mgr.c
@@ -139,6 +139,7 @@ step_complete_t step_complete = {
 	{},
 	-1,
 	-1,
+	true,
 	(bitstr_t *)NULL,
 	0,
         NULL
@@ -478,6 +479,8 @@ _wait_for_children_slurmstepd(slurmd_job_t *job)
 		step_complete.step_rc = MAX(step_complete.step_rc,
 					 WEXITSTATUS(job->task[i]->estatus));
 
+	step_complete.wait_children = false;
+
 	pthread_mutex_unlock(&step_complete.lock);
 }
 
@@ -495,6 +498,7 @@ _one_step_complete_msg(slurmd_job_t *job, int first, int last)
 	int rc = -1;
 	int retcode;
 	int i;
+	static bool acct_sent = false;
 
 	debug2("_one_step_complete_msg: first=%d, last=%d", first, last);
 	msg.job_id = job->jobid;
@@ -504,9 +508,12 @@ _one_step_complete_msg(slurmd_job_t *job, int first, int last)
 	msg.step_rc = step_complete.step_rc;
 	msg.jobacct = jobacct_gather_g_create(NULL);
 	/************* acct stuff ********************/
-	jobacct_gather_g_aggregate(step_complete.jobacct, job->jobacct);
-	jobacct_gather_g_getinfo(step_complete.jobacct, JOBACCT_DATA_TOTAL, 
-			  msg.jobacct);
+	if(!acct_sent) {
+		jobacct_gather_g_aggregate(step_complete.jobacct, job->jobacct);
+		jobacct_gather_g_getinfo(step_complete.jobacct, JOBACCT_DATA_TOTAL, 
+				  msg.jobacct);
+		acct_sent = true;
+	}
 	/*********************************************/	
 	slurm_msg_t_init(&req);
 	req.msg_type = REQUEST_STEP_COMPLETE;
diff --git a/src/slurmd/slurmstepd/req.c b/src/slurmd/slurmstepd/req.c
index ccf032042ca..857d9d46203 100644
--- a/src/slurmd/slurmstepd/req.c
+++ b/src/slurmd/slurmstepd/req.c
@@ -1132,6 +1132,12 @@ _handle_completion(int fd, slurmd_job_t *job, uid_t uid)
 	 * Record the completed nodes
 	 */
 	pthread_mutex_lock(&step_complete.lock);
+	if (! step_complete.wait_children) {
+		rc = -1;
+		errnum = ETIMEDOUT; /* not used anyway */
+		goto timeout;
+	}
+
 /* 	debug2("Setting range %d(bit %d) through %d(bit %d)", */
 /* 	       first, first-(step_complete.rank+1), */
 /* 	       last, last-(step_complete.rank+1)); */
@@ -1146,6 +1152,7 @@ _handle_completion(int fd, slurmd_job_t *job, uid_t uid)
 	
 	/************* acct stuff ********************/
 	jobacct_gather_g_aggregate(step_complete.jobacct, jobacct);
+timeout:
 	jobacct_gather_g_destroy(jobacct);
 	/*********************************************/
 	
diff --git a/src/slurmd/slurmstepd/slurmstepd.h b/src/slurmd/slurmstepd/slurmstepd.h
index ef67ba3f094..8cc15eaedd2 100644
--- a/src/slurmd/slurmstepd/slurmstepd.h
+++ b/src/slurmd/slurmstepd/slurmstepd.h
@@ -55,6 +55,7 @@ typedef struct {
 	slurm_addr parent_addr;
 	int children;
 	int max_depth;
+	bool wait_children;
 	bitstr_t *bits;
 	int step_rc;
 	jobacctinfo_t *jobacct;
-- 
GitLab