Skip to content
Snippets Groups Projects
Commit 272c13a7 authored by Mark Grondona's avatar Mark Grondona
Browse files

o add support for timing out hanging threads (e.g. stuck in connect())

 o fail launch thread when job state is no longer "LAUNCHING"
parent a43b6d04
No related branches found
No related tags found
No related merge requests found
......@@ -38,6 +38,7 @@
#include "src/common/log.h"
#include "src/common/slurm_protocol_api.h"
#include "src/common/xmalloc.h"
#include "src/common/xsignal.h"
#include "src/srun/job.h"
#include "src/srun/launch.h"
......@@ -53,19 +54,23 @@ static int fail_launch_cnt = 0;
typedef enum {DSH_NEW, DSH_ACTIVE, DSH_DONE, DSH_FAILED} state_t;
typedef struct task_info {
slurm_msg_t *req;
job_t *job;
} task_info_t;
typedef struct thd {
pthread_t thread; /* thread ID */
pthread_attr_t attr; /* thread attributes */
state_t state; /* thread state */
time_t tstart; /* time thread started */
task_info_t task;
} thd_t;
typedef struct task_info {
slurm_msg_t *req_ptr;
job_t *job_ptr;
} task_info_t;
static void _dist_block(job_t *job);
static void _dist_cyclic(job_t *job);
static int _check_pending_threads(thd_t *thd, int count);
static void _spawn_launch_thr(thd_t *th);
static int _wait_on_active(thd_t *thd, job_t *job);
static void _p_launch(slurm_msg_t *req_array_ptr, job_t *job);
static void * _p_launch_task(void *args);
static void _print_launch_msg(launch_tasks_request_msg_t *msg,
......@@ -227,64 +232,118 @@ launch(void *arg)
return(void *)(0);
}
static int _check_pending_threads(thd_t *thd, int count)
{
int i;
time_t now = time(NULL);
for (i = 0; i < count; i++) {
if ((thd[i].state == DSH_ACTIVE)
&& ((now - thd[i].tstart) >= 2) )
pthread_kill(thd[i].thread, SIGALRM);
}
return 0;
}
static void _spawn_launch_thr(thd_t *th)
{
pthread_attr_t attr;
int err = 0;
if ((err = pthread_attr_init (&attr)))
fatal ("pthread_attr_init: %s", slurm_strerror(err));
err = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
if (err)
error ("pthread_attr_setdetachstate: %s", slurm_strerror(err));
#ifdef PTHREAD_SCOPE_SYSTEM
if ((err = pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM)))
error ("pthread_attr_setscope: %s", slurm_strerror(err));
#endif
err = pthread_create(&th->thread, &attr, _p_launch_task, (void *)th);
if (err) {
error ("pthread_create: %s", slurm_strerror(err));
/* just run it under this thread */
_p_launch_task((void *) th);
}
return;
}
static int _wait_on_active(thd_t *thd, job_t *job)
{
struct timeval now;
struct timespec timeout;
int rc;
gettimeofday(&now, NULL);
timeout.tv_sec = now.tv_sec + 1;
timeout.tv_nsec = now.tv_usec * 1000;
rc = pthread_cond_timedwait( &active_cond,
&active_mutex,
&timeout );
if (rc == ETIMEDOUT)
_check_pending_threads(thd, job->nhosts);
return rc;
}
static void _alrm_handler(int signo) { }
/* _p_launch - parallel (multi-threaded) task launcher */
static void _p_launch(slurm_msg_t *req_array_ptr, job_t *job)
static void _p_launch(slurm_msg_t *req, job_t *job)
{
int i;
task_info_t *task_info_ptr;
thd_t *thread_ptr;
thd_t *thd;
int rc = 0;
SigFunc *oldh;
oldh = xsignal(SIGALRM, (SigFunc *) _alrm_handler);
/*
* Set job timeout to maximum launch time + current time
*/
job->ltimeout = time(NULL) + opt.max_launch_time;
thread_ptr = xmalloc (job->nhosts * sizeof (thd_t));
thd = xmalloc (job->nhosts * sizeof (thd_t));
for (i = 0; i < job->nhosts; i++) {
if (job->ntask[i] == 0) { /* No tasks for this node */
debug("Node %s is unused",job->host[i]);
continue;
}
pthread_mutex_lock(&active_mutex);
while (active >= opt.max_threads) {
pthread_cond_wait(&active_cond, &active_mutex);
}
while (active >= opt.max_threads || rc < 0)
rc = _wait_on_active(thd, job);
active++;
pthread_mutex_unlock(&active_mutex);
task_info_ptr = (task_info_t *)xmalloc(sizeof(task_info_t));
task_info_ptr->req_ptr = &req_array_ptr[i];
task_info_ptr->job_ptr = job;
if (pthread_attr_init (&thread_ptr[i].attr))
fatal ("pthread_attr_init error %m");
if (pthread_attr_setdetachstate (&thread_ptr[i].attr,
PTHREAD_CREATE_DETACHED))
error ("pthread_attr_setdetachstate error %m");
#ifdef PTHREAD_SCOPE_SYSTEM
if (pthread_attr_setscope (&thread_ptr[i].attr,
PTHREAD_SCOPE_SYSTEM))
error ("pthread_attr_setscope error %m");
#endif
if (job->state > SRUN_JOB_LAUNCHING)
break;
if ( pthread_create ( &thread_ptr[i].thread,
&thread_ptr[i].attr,
_p_launch_task,
(void *) task_info_ptr) ) {
error ("pthread_create error %m");
/* just run it under this thread */
_p_launch_task((void *) task_info_ptr);
}
thd[i].task.req = &req[i];
thd[i].task.job = job;
_spawn_launch_thr(&thd[i]);
}
pthread_mutex_lock(&active_mutex);
while (active > 0) {
pthread_cond_wait(&active_cond, &active_mutex);
}
while (active > 0)
_wait_on_active(thd, job);
pthread_mutex_unlock(&active_mutex);
xfree(thread_ptr);
xsignal(SIGALRM, oldh);
xfree(thd);
}
static int
......@@ -305,9 +364,12 @@ _send_msg_rc(slurm_msg_t *msg)
static void
_update_failed_node(job_t *j, int id)
{
int i;
pthread_mutex_lock(&j->task_mutex);
if (j->host_state[id] == SRUN_HOST_INIT)
j->host_state[id] = SRUN_HOST_UNREACHABLE;
for (i = 0; i < j->ntask[id]; i++)
j->task_state[j->tids[id][i]] = SRUN_TASK_FAILED;
pthread_mutex_unlock(&j->task_mutex);
/* update_failed_tasks(j, id); */
......@@ -326,27 +388,44 @@ _update_contacted_node(job_t *j, int id)
/* _p_launch_task - parallelized launch of a specific task */
static void * _p_launch_task(void *arg)
{
task_info_t *tp = (task_info_t *)arg;
slurm_msg_t *req = tp->req_ptr;
thd_t *th = (thd_t *)arg;
task_info_t *tp = &(th->task);
slurm_msg_t *req = tp->req;
launch_tasks_request_msg_t *msg = req->data;
job_t *job = tp->job_ptr;
job_t *job = tp->job;
int nodeid = msg->srun_node_id;
int failure = 0;
int retry = 3; /* retry thrice */
th->state = DSH_ACTIVE;
th->tstart = time(NULL);
if (_verbose)
_print_launch_msg(msg, job->host[nodeid]);
again:
if (_send_msg_rc(req) < 0) { /* Has timeout */
error("launch error on %s: %m", job->host[nodeid]);
if ((errno != ETIMEDOUT) && retry--) {
if (errno != EINTR)
verbose("launch error on %s: %m", job->host[nodeid]);
if ((errno != ETIMEDOUT)
&& (job->state == SRUN_JOB_LAUNCHING)
&& (errno != ESLURMD_INVALID_JOB_CREDENTIAL)
&& retry-- ) {
sleep(1);
goto again;
}
if (errno == EINTR)
verbose("launch on %s canceled", job->host[nodeid]);
else
error("launch error on %s: %m", job->host[nodeid]);
_update_failed_node(job, nodeid);
th->state = DSH_FAILED;
failure = 1;
} else
......@@ -354,12 +433,12 @@ static void * _p_launch_task(void *arg)
pthread_mutex_lock(&active_mutex);
th->state = DSH_DONE;
active--;
fail_launch_cnt += failure;
pthread_cond_signal(&active_cond);
pthread_mutex_unlock(&active_mutex);
xfree(arg);
return NULL;
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment