From 272c13a7ea7f3ea769d1700d4ef65f7fd9d45b09 Mon Sep 17 00:00:00 2001
From: Mark Grondona <mgrondona@llnl.gov>
Date: Fri, 20 Jun 2003 19:02:20 +0000
Subject: [PATCH]  o add support for timing out hanging threads (e.g. stuck in
 connect())  o fail launch thread when job state is no longer "LAUNCHING"

---
 src/srun/launch.c | 169 ++++++++++++++++++++++++++++++++++------------
 1 file changed, 124 insertions(+), 45 deletions(-)

diff --git a/src/srun/launch.c b/src/srun/launch.c
index ffeabff16e6..baa035d313e 100644
--- a/src/srun/launch.c
+++ b/src/srun/launch.c
@@ -38,6 +38,7 @@
 #include "src/common/log.h"
 #include "src/common/slurm_protocol_api.h"
 #include "src/common/xmalloc.h"
+#include "src/common/xsignal.h"
 
 #include "src/srun/job.h"
 #include "src/srun/launch.h"
@@ -53,19 +54,23 @@ static int             fail_launch_cnt = 0;
 
 typedef enum {DSH_NEW, DSH_ACTIVE, DSH_DONE, DSH_FAILED} state_t;
 
+typedef struct task_info {
+	slurm_msg_t *req;
+	job_t *job;
+} task_info_t;
+
 typedef struct thd {
         pthread_t	thread;			/* thread ID */
-        pthread_attr_t	attr;			/* thread attributes */
         state_t		state;      		/* thread state */
+	time_t          tstart;			/* time thread started */
+	task_info_t     task;
 } thd_t;
 
-typedef struct task_info {
-	slurm_msg_t *req_ptr;
-	job_t *job_ptr;
-} task_info_t;
-
 static void   _dist_block(job_t *job);
 static void   _dist_cyclic(job_t *job);
+static int    _check_pending_threads(thd_t *thd, int count);
+static void   _spawn_launch_thr(thd_t *th);
+static int    _wait_on_active(thd_t *thd, job_t *job);
 static void   _p_launch(slurm_msg_t *req_array_ptr, job_t *job);
 static void * _p_launch_task(void *args);
 static void   _print_launch_msg(launch_tasks_request_msg_t *msg, 
@@ -227,64 +232,118 @@ launch(void *arg)
 	return(void *)(0);
 }
 
+static int _check_pending_threads(thd_t *thd, int count)
+{
+	int i;
+	time_t now = time(NULL);
+
+	for (i = 0; i < count; i++) {
+		if ((thd[i].state == DSH_ACTIVE) 
+		    && ((now - thd[i].tstart) >= 2) ) 
+			pthread_kill(thd[i].thread, SIGALRM);
+	}
+
+	return 0;
+}
+
+
+static void _spawn_launch_thr(thd_t *th)
+{
+	pthread_attr_t attr;
+	int err = 0;
+
+	if ((err = pthread_attr_init (&attr)))
+		fatal ("pthread_attr_init: %s", slurm_strerror(err));
+
+
+	err = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
+	if (err)
+		error ("pthread_attr_setdetachstate: %s", slurm_strerror(err));
+
+#ifdef PTHREAD_SCOPE_SYSTEM
+	if ((err = pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM)))
+		error ("pthread_attr_setscope: %s", slurm_strerror(err));
+#endif
+
+	err = pthread_create(&th->thread, &attr, _p_launch_task, (void *)th);
+	if (err) {
+		error ("pthread_create: %s", slurm_strerror(err));
+
+		/* just run it under this thread */
+		_p_launch_task((void *) th);
+	}
+
+	return;
+}
+
+static int _wait_on_active(thd_t *thd, job_t *job)
+{
+	struct timeval now;
+	struct timespec timeout;
+	int rc;
+
+	gettimeofday(&now, NULL);
+	timeout.tv_sec  = now.tv_sec + 1;
+	timeout.tv_nsec = now.tv_usec * 1000;
+
+	rc = pthread_cond_timedwait( &active_cond, 
+			             &active_mutex,
+			             &timeout      );
+
+	if (rc == ETIMEDOUT)
+		_check_pending_threads(thd, job->nhosts);
+
+	return rc;
+}
+
+static void _alrm_handler(int signo) { }
+
 /* _p_launch - parallel (multi-threaded) task launcher */
-static void _p_launch(slurm_msg_t *req_array_ptr, job_t *job)
+static void _p_launch(slurm_msg_t *req, job_t *job)
 {
 	int i;
-	task_info_t *task_info_ptr;
-	thd_t *thread_ptr;
+	thd_t *thd;
+	int rc = 0;
+	SigFunc *oldh;
 
+	oldh = xsignal(SIGALRM, (SigFunc *) _alrm_handler);
 	/*
 	 * Set job timeout to maximum launch time + current time
 	 */
 	job->ltimeout = time(NULL) + opt.max_launch_time;
 
-	thread_ptr = xmalloc (job->nhosts * sizeof (thd_t));
+	thd = xmalloc (job->nhosts * sizeof (thd_t));
 	for (i = 0; i < job->nhosts; i++) {
+
 		if (job->ntask[i] == 0)	{	/* No tasks for this node */
 			debug("Node %s is unused",job->host[i]);
 			continue;
 		}
 
 		pthread_mutex_lock(&active_mutex);
-		while (active >= opt.max_threads) {
-			pthread_cond_wait(&active_cond, &active_mutex);
-		}
+		while (active >= opt.max_threads || rc < 0) 
+			rc = _wait_on_active(thd, job);
+
 		active++;
 		pthread_mutex_unlock(&active_mutex);
 
-		task_info_ptr = (task_info_t *)xmalloc(sizeof(task_info_t));
-		task_info_ptr->req_ptr = &req_array_ptr[i];
-		task_info_ptr->job_ptr = job;
-
-		if (pthread_attr_init (&thread_ptr[i].attr))
-			fatal ("pthread_attr_init error %m");
-		if (pthread_attr_setdetachstate (&thread_ptr[i].attr, 
-						 PTHREAD_CREATE_DETACHED))
-			error ("pthread_attr_setdetachstate error %m");
-#ifdef PTHREAD_SCOPE_SYSTEM
-		if (pthread_attr_setscope (&thread_ptr[i].attr, 
-					   PTHREAD_SCOPE_SYSTEM))
-			error ("pthread_attr_setscope error %m");
-#endif
+		if (job->state > SRUN_JOB_LAUNCHING)
+			break;
 
-		if ( pthread_create (	&thread_ptr[i].thread, 
-		                        &thread_ptr[i].attr, 
-		                        _p_launch_task, 
-		                        (void *) task_info_ptr) ) {
-			error ("pthread_create error %m");
-			/* just run it under this thread */
-			_p_launch_task((void *) task_info_ptr);
-		}
+		thd[i].task.req = &req[i];
+		thd[i].task.job = job;
 
+		_spawn_launch_thr(&thd[i]);
 	}
 
 	pthread_mutex_lock(&active_mutex);
-	while (active > 0) {
-		pthread_cond_wait(&active_cond, &active_mutex);
-	}
+	while (active > 0) 
+		_wait_on_active(thd, job);
 	pthread_mutex_unlock(&active_mutex);
-	xfree(thread_ptr);
+
+	xsignal(SIGALRM, oldh);
+
+	xfree(thd);
 }
 
 static int
@@ -305,9 +364,12 @@ _send_msg_rc(slurm_msg_t *msg)
 static void
 _update_failed_node(job_t *j, int id)
 {
+	int i;
 	pthread_mutex_lock(&j->task_mutex);
 	if (j->host_state[id] == SRUN_HOST_INIT)
 		j->host_state[id] = SRUN_HOST_UNREACHABLE;
+	for (i = 0; i < j->ntask[id]; i++)
+		j->task_state[j->tids[id][i]] = SRUN_TASK_FAILED;
 	pthread_mutex_unlock(&j->task_mutex);
 
 	/* update_failed_tasks(j, id); */
@@ -326,27 +388,44 @@ _update_contacted_node(job_t *j, int id)
 /* _p_launch_task - parallelized launch of a specific task */
 static void * _p_launch_task(void *arg)
 {
-	task_info_t                *tp     = (task_info_t *)arg;
-	slurm_msg_t                *req    = tp->req_ptr;
+	thd_t                      *th     = (thd_t *)arg;
+	task_info_t                *tp     = &(th->task);
+	slurm_msg_t                *req    = tp->req;
 	launch_tasks_request_msg_t *msg    = req->data;
-	job_t                      *job    = tp->job_ptr;
+	job_t                      *job    = tp->job;
 	int                        nodeid  = msg->srun_node_id;
 	int                        failure = 0;
 	int                        retry   = 3; /* retry thrice */
 
+	th->state  = DSH_ACTIVE;
+	th->tstart = time(NULL);
+
 	if (_verbose)
 	        _print_launch_msg(msg, job->host[nodeid]);
 
     again:
 	if (_send_msg_rc(req) < 0) {	/* Has timeout */
 
-		error("launch error on %s: %m", job->host[nodeid]);
-		if ((errno != ETIMEDOUT) && retry--) {
+		if (errno != EINTR)
+			verbose("launch error on %s: %m", job->host[nodeid]);
+
+		if ((errno != ETIMEDOUT) 
+		    && (job->state == SRUN_JOB_LAUNCHING)
+		    && (errno != ESLURMD_INVALID_JOB_CREDENTIAL) 
+		    &&  retry--                                  ) {
 			sleep(1);
 			goto again;
 		}
 
+		if (errno == EINTR)
+			verbose("launch on %s canceled", job->host[nodeid]);
+		else
+			error("launch error on %s: %m", job->host[nodeid]);
+
 		_update_failed_node(job, nodeid);
+
+		th->state = DSH_FAILED;
+
 		failure = 1;
 
 	} else 
@@ -354,12 +433,12 @@ static void * _p_launch_task(void *arg)
 
 
 	pthread_mutex_lock(&active_mutex);
+	th->state = DSH_DONE;
 	active--;
 	fail_launch_cnt += failure;
 	pthread_cond_signal(&active_cond);
 	pthread_mutex_unlock(&active_mutex);
 
-	xfree(arg);
 	return NULL;
 }
 
-- 
GitLab