From 8cf7f218390e4997b576b9fc13cdb9e0790dda0a Mon Sep 17 00:00:00 2001
From: "David J. Bremer" <dbremer@llnl.gov>
Date: Thu, 23 Jul 2009 20:58:50 +0000
Subject: [PATCH] Added a list of steps that are starting, to handle a problem
 with stepds that are slow in forking or execing, which could potentially miss
 a request to cancel and run indefinitely.

Also addressed a race condition for jobs that were cancelled
or hit their time limit, which might cause them to stay in
completing state longer than necessary.
---
 src/slurmd/slurmd/req.c    | 237 ++++++++++++++++++++++++++++++++++++-
 src/slurmd/slurmd/slurmd.c |   7 ++
 src/slurmd/slurmd/slurmd.h |   5 +
 3 files changed, 248 insertions(+), 1 deletion(-)

diff --git a/src/slurmd/slurmd/req.c b/src/slurmd/slurmd/req.c
index bee6acace8f..6b4e10125d6 100644
--- a/src/slurmd/slurmd/req.c
+++ b/src/slurmd/slurmd/req.c
@@ -100,6 +100,11 @@ typedef struct {
 	uint32_t job_mem;
 } job_mem_limits_t;
 
+typedef struct {
+	uint32_t job_id;
+	uint32_t step_id;
+} starting_step_t;
+
 static int  _abort_job(uint32_t job_id);
 static int  _abort_step(uint32_t job_id, uint32_t step_id);
 static char **_build_env(uint32_t jobid, uid_t uid, char *resv_id, 
@@ -153,6 +158,12 @@ static long _get_job_uid(uint32_t jobid);
 
 static gids_t *_gids_cache_lookup(char *user, gid_t gid);
 
+static int  _add_starting_step(slurmd_step_type_t type, void *req);
+static int  _remove_starting_step(slurmd_step_type_t type, void *req);
+static int  _compare_starting_steps(void *s0, void *s1);
+static int  _wait_for_starting_step(uint32_t job_id, uint32_t step_id);
+static bool _step_is_starting(uint32_t job_id, uint32_t step_id);
+
 /*
  *  List of threads waiting for jobs to complete
  */
@@ -521,12 +532,18 @@ _forkexec_slurmstepd(slurmd_step_type_t type, void *req,
 		return SLURM_FAILURE;
 	}
 
+	if (_add_starting_step(type, req)) {
+		error("_forkexec_slurmstepd failed in _add_starting_step: %m");
+		return SLURM_FAILURE;
+	}
+
 	if ((pid = fork()) < 0) {
 		error("_forkexec_slurmstepd: fork: %m");
 		close(to_stepd[0]);
 		close(to_stepd[1]);
 		close(to_slurmd[0]);
 		close(to_slurmd[1]);
+		_remove_starting_step(type, req);
 		return SLURM_FAILURE;
 	} else if (pid > 0) {
 		int rc = 0;
@@ -554,6 +571,9 @@ _forkexec_slurmstepd(slurmd_step_type_t type, void *req,
 		}
 
 	done:
+		if (_remove_starting_step(type, req))
+			error("Error cleaning up starting_step list");
+
 		/* Reap child */
 		if (waitpid(pid, NULL, 0) < 0)
 			error("Unable to reap slurmd child process");
@@ -2883,7 +2903,10 @@ _rpc_terminate_job(slurm_msg_t *msg)
 	 */
 	if (_waiter_init(req->job_id) == SLURM_ERROR) {
 		if (msg->conn_fd >= 0) {
-			slurm_send_rc_msg (msg, SLURM_SUCCESS);
+			if (_step_is_starting(req->job_id, NO_VAL))
+				slurm_send_rc_msg (msg, EAGAIN);
+			else
+				slurm_send_rc_msg (msg, SLURM_SUCCESS);
 		}
 		return;
 	}
@@ -2898,6 +2921,30 @@ _rpc_terminate_job(slurm_msg_t *msg)
 		debug("credential for job %u revoked", req->job_id);
 	}
 
+	/*
+	 * Before signalling steps, if the job has any steps that are still
+	 * in the process of fork/exec/check in with slurmd, wait on a condition
+	 * var for the start.  Otherwise a slow-starting step can miss the
+	 * job termination message and run indefinitely. 
+	 */
+	if (_step_is_starting(req->job_id, NO_VAL)) {
+		if (msg->conn_fd >= 0) {
+			debug4("sent EAGAIN");
+			slurm_send_rc_msg (msg, EAGAIN);
+			if (slurm_close_accepted_conn(msg->conn_fd) < 0)
+				error ( "rpc_kill_job: close(%d): %m", 
+					msg->conn_fd);
+			msg->conn_fd = -1;
+		}
+		if (_wait_for_starting_step(req->job_id, NO_VAL)) {
+			/* 
+			 * There's currently no case in which we enter this 
+			 * error condition.  If there was, it's hard to say 
+			 * whether to to proceed with the job termination. 
+			 */
+			error("Error in _wait_for_starting_step");
+		}
+	}
 	if (IS_JOB_NODE_FAILED(req) || IS_JOB_PENDING(req)) /* requeued */
 		_kill_all_active_steps(req->job_id, SIG_NODE_FAIL, true);
 	else if (IS_JOB_FAILED(req))
@@ -2936,7 +2983,17 @@ _rpc_terminate_job(slurm_msg_t *msg)
 			slurm_send_rc_msg(msg,
 					  ESLURMD_KILL_JOB_ALREADY_COMPLETE);
 		slurm_cred_begin_expiration(conf->vctx, req->job_id);
+		save_cred_state(conf->vctx);
 		_waiter_complete(req->job_id);
+
+		/* 
+		 * The controller needs to get MESSAGE_EPILOG_COMPLETE to bring
+		 * the job out of "completing" state.  Otherwise, the job
+		 * could remain "completing" unnecessarily, until the request 
+		 * to terminate is resent.
+		 */
+		_sync_messages_kill(req);
+		_epilog_complete(req->job_id, rc);
 		return;
 	}
 #endif
@@ -3498,3 +3555,181 @@ init_gids_cache(int cache)
 	setgroups(ngids, orig_gids);		
 	xfree(orig_gids);
 }
+
+
+static int 
+_add_starting_step(slurmd_step_type_t type, void *req)
+{
+	starting_step_t *starting_step;
+	int rc = SLURM_SUCCESS;
+
+	/* Add the step info to a list of starting processes that 
+	   cannot reliably be contacted. */
+	slurm_mutex_lock(&conf->starting_steps_lock);
+	starting_step = xmalloc(sizeof(starting_step_t));
+	if (!starting_step) {
+		error("_add_starting_step failed to allocate memory");
+		rc = SLURM_FAILURE;
+		goto fail;
+	}
+	switch(type) {
+	case LAUNCH_BATCH_JOB:
+		starting_step->job_id = 
+			((batch_job_launch_msg_t *)req)->job_id;
+		starting_step->step_id = 
+			((batch_job_launch_msg_t *)req)->step_id;
+		break;
+	case LAUNCH_TASKS:
+		starting_step->job_id = 
+			((launch_tasks_request_msg_t *)req)->job_id;
+		starting_step->step_id = 
+			((launch_tasks_request_msg_t *)req)->job_step_id;
+		break;
+	default:
+		error("_add_starting_step called with an invalid type");
+		rc = SLURM_FAILURE;
+		xfree(starting_step);
+		goto fail;
+	}
+	if (!list_append(conf->starting_steps, starting_step)) {
+		error("_add_starting_step failed to allocate memory for list");
+		rc = SLURM_FAILURE;
+		xfree(starting_step);
+		goto fail;
+	}
+
+fail:
+	slurm_mutex_unlock(&conf->starting_steps_lock);
+	return rc;
+}
+
+
+static int 
+_remove_starting_step(slurmd_step_type_t type, void *req)
+{
+	uint32_t job_id, step_id;
+	ListIterator iter;
+	starting_step_t *starting_step;
+	int rc = SLURM_SUCCESS;
+	bool found = false;
+
+	slurm_mutex_lock(&conf->starting_steps_lock);
+
+	switch(type) {
+	case LAUNCH_BATCH_JOB:
+		job_id =  ((batch_job_launch_msg_t *)req)->job_id;
+		step_id = ((batch_job_launch_msg_t *)req)->step_id;
+		break;
+	case LAUNCH_TASKS:
+		job_id =  ((launch_tasks_request_msg_t *)req)->job_id;
+		step_id = ((launch_tasks_request_msg_t *)req)->job_step_id;
+		break;
+	default:
+		error("_remove_starting_step called with an invalid type");
+		rc = SLURM_FAILURE;
+		goto fail;
+	}
+
+	iter = list_iterator_create(conf->starting_steps);
+	while ((starting_step = list_next(iter))) {
+		if (starting_step->job_id  == job_id &&
+		    starting_step->step_id == step_id) {
+			starting_step = list_remove(iter);
+			xfree(starting_step);
+
+			found = true;
+			pthread_cond_broadcast(&conf->starting_steps_cond);
+			break;
+		}
+	}
+	if (!found) {
+		error("_remove_starting_step: step not found");
+		rc = SLURM_FAILURE;
+	}
+fail:
+	slurm_mutex_unlock(&conf->starting_steps_lock);
+	return rc;
+}
+
+
+
+static int _compare_starting_steps(void *listentry, void *key)
+{
+	starting_step_t *step0 = (starting_step_t *)listentry; 
+	starting_step_t *step1 = (starting_step_t *)key;
+
+	if (step1->step_id != NO_VAL)
+		return (step0->job_id  == step1->job_id &&
+			step0->step_id == step1->step_id);
+	else
+		return (step0->job_id  == step1->job_id);
+}
+
+
+/* Wait for a step to get far enough in the launch process to have
+   a socket open, ready to handle RPC calls.  Pass step_id = NO_VAL
+   to wait on any step for the given job. */
+
+static int _wait_for_starting_step(uint32_t job_id, uint32_t step_id)
+{
+	starting_step_t  starting_step;
+	starting_step.job_id  = job_id;
+	starting_step.step_id = step_id;
+	int num_passes = 0;
+
+	slurm_mutex_lock(&conf->starting_steps_lock);
+
+	while (list_find_first( conf->starting_steps, 
+				&_compare_starting_steps,
+				&starting_step )) {
+		if (num_passes == 0) {
+			if (step_id != NO_VAL)
+				debug( "Blocked waiting for step %d.%d", 
+					job_id, step_id);
+			else
+				debug( "Blocked waiting for job %d, all steps",
+					job_id);
+		}
+		num_passes++;
+
+		pthread_cond_wait(&conf->starting_steps_cond,
+				  &conf->starting_steps_lock);
+	}
+	if (num_passes > 0) {
+		if (step_id != NO_VAL)
+			debug( "Finished wait for step %d.%d", 
+				job_id, step_id);
+		else
+			debug( "Finished wait for job %d, all steps",
+				job_id);
+	}
+	slurm_mutex_unlock(&conf->starting_steps_lock);
+
+	return SLURM_SUCCESS;
+}
+
+
+/* Return true if the step has not yet confirmed that its socket to 
+   handle RPC calls has been created.  Pass step_id = NO_VAL
+   to return true if any of the job's steps are still starting. */
+static bool _step_is_starting(uint32_t job_id, uint32_t step_id)
+{
+	starting_step_t  starting_step;
+	starting_step.job_id  = job_id;
+	starting_step.step_id = step_id;
+	bool ret = false;
+
+	slurm_mutex_lock(&conf->starting_steps_lock);
+
+	if (list_find_first( conf->starting_steps, 
+			     &_compare_starting_steps,
+			     &starting_step )) {
+		ret = true;
+	}
+
+	slurm_mutex_unlock(&conf->starting_steps_lock);
+	return ret;
+}
+
+
+
diff --git a/src/slurmd/slurmd/slurmd.c b/src/slurmd/slurmd/slurmd.c
index 5c326ca2412..413f0ef0398 100644
--- a/src/slurmd/slurmd/slurmd.c
+++ b/src/slurmd/slurmd/slurmd.c
@@ -894,6 +894,10 @@ _init_conf(void)
 	conf->spooldir	  = xstrdup(DEFAULT_SPOOLDIR);
 
 	slurm_mutex_init(&conf->config_mutex);
+
+	conf->starting_steps = list_create(NULL);
+	slurm_mutex_init(&conf->starting_steps_lock);
+	pthread_cond_init(&conf->starting_steps_cond, NULL);
 	return;
 }
 
@@ -919,6 +923,9 @@ _destroy_conf(void)
 		xfree(conf->stepd_loc);
 		xfree(conf->tmpfs);
 		slurm_mutex_destroy(&conf->config_mutex);
+		list_destroy(conf->starting_steps);
+		slurm_mutex_destroy(&conf->starting_steps_lock);
+		pthread_cond_destroy(&conf->starting_steps_cond);
 		slurm_cred_ctx_destroy(conf->vctx);
 		xfree(conf);
 	}
diff --git a/src/slurmd/slurmd/slurmd.h b/src/slurmd/slurmd/slurmd.h
index f4a89129b71..31f82f28350 100644
--- a/src/slurmd/slurmd/slurmd.h
+++ b/src/slurmd/slurmd/slurmd.h
@@ -130,6 +130,11 @@ typedef struct slurmd_config {
 	uint16_t	task_plugin_param; /* TaskPluginParams, expressed
 					 * using cpu_bind_type_t flags */
 	uint16_t	propagate_prio;	/* PropagatePrioProcess flag       */
+
+	List		starting_steps; /* steps that are starting but cannot 
+					   receive RPCs yet */
+	pthread_mutex_t	starting_steps_lock;
+	pthread_cond_t	starting_steps_cond;
 } slurmd_conf_t;
 
 extern slurmd_conf_t * conf;
-- 
GitLab