diff --git a/src/common/global_srun.h b/src/common/global_srun.h index 89bb7667b9d1f3412b9cbc7be4988175ec9e0b24..bc6eba242c3c56e26b265a9da636fa3e8cb41b05 100644 --- a/src/common/global_srun.h +++ b/src/common/global_srun.h @@ -82,7 +82,8 @@ typedef enum { PIPE_SIGNALED, PIPE_MPIR_DEBUG_STATE, PIPE_UPDATE_MPIR_PROCTABLE, - PIPE_UPDATE_STEP_LAYOUT + PIPE_UPDATE_STEP_LAYOUT, + PIPE_NODE_FAIL } pipe_enum_t; /* For Message thread */ diff --git a/src/srun/msg.c b/src/srun/msg.c index 90020d8ad6030c5a22207e7dcdd3e651b17f7dcf..65afd2de8df49fc54e5f593f6226ab334ea4a5fb 100644 --- a/src/srun/msg.c +++ b/src/srun/msg.c @@ -103,7 +103,8 @@ static void _msg_thr_poll(srun_job_t *job); static void _set_jfds_nonblocking(srun_job_t *job); static void _print_pid_list(const char *host, int ntasks, uint32_t *pid, char *executable_name); -static void _node_fail_handler(char *nodelist, srun_job_t *job); +static void _node_fail_handler(int fd, srun_job_t *job); +static void _node_fail_forwarder(char *nodelist, srun_job_t *job); #define _poll_set_rd(_pfd, _fd) do { \ (_pfd).fd = _fd; \ @@ -341,17 +342,98 @@ void timeout_handler(time_t timeout) * not. The job will continue to execute given the --no-kill option. * Otherwise all of the job's tasks and the job itself are killed.. */ -static void _node_fail_handler(char *nodelist, srun_job_t *job) +static void _node_fail_handler(int fd, srun_job_t *job) { - if ( (opt.no_kill) ) { - error("Node failure on %s, eliminated that node", nodelist); - return; + char *nodelist = NULL; + int len = 0; + hostset_t fail_nodes, all_nodes; + hostlist_iterator_t fail_itr; + char *node; + int num_node_ids; + int *node_ids; + int i, j; + int node_id, num_tasks; + + /* get the hostlist string of failed nodes from the message thread */ + safe_read(fd, &len, sizeof(int)); + nodelist = (char *)xmalloc(len+1); + safe_read(fd, nodelist, len); + nodelist[len] = '\0'; + + /* now process the down nodes and tell the IO client about them */ + fail_nodes = hostset_create(nodelist); + fail_itr = hostset_iterator_create(fail_nodes); + num_node_ids = hostset_count(fail_nodes); + node_ids = xmalloc(sizeof(int) * num_node_ids); + + all_nodes = hostset_create(job->step_layout->node_list); + /* find the index number of each down node */ + slurm_mutex_lock(&job->task_mutex); + for (i = 0; i < num_node_ids; i++) { + node = hostlist_next(fail_itr); + node_id = node_ids[i] = hostset_find(all_nodes, node); + if (job->host_state[node_id] != SRUN_HOST_UNREACHABLE) { + error("Node failure: %s.", node); + job->host_state[node_id] = SRUN_HOST_UNREACHABLE; + } + free(node); + + /* find all of the tasks that should run on this failed node + * and mark them as having failed. + */ + num_tasks = job->step_layout->tasks[node_id]; + for (j = 0; j < num_tasks; j++) { + int gtaskid; + debug2("marking task %d done on failed node %d", + job->step_layout->tids[node_id][j], node_id); + gtaskid = job->step_layout->tids[node_id][j]; + job->task_state[gtaskid] = SRUN_TASK_FAILED; + } } + slurm_mutex_unlock(&job->task_mutex); - error("Node failure on %s, killing job", nodelist); - update_job_state(job, SRUN_JOB_FORCETERM); - info("sending Ctrl-C to remaining tasks"); - fwd_signal(job, SIGINT, opt.max_threads); + client_io_handler_downnodes(job->client_io, node_ids, num_node_ids); + + if (!opt.no_kill) { + update_job_state(job, SRUN_JOB_FORCETERM); + info("sending SIGINT to remaining tasks"); + fwd_signal(job, SIGINT, opt.max_threads); + } + + xfree(nodelist); + return; +rwfail: + error("Failure reading node failure message from message process: %m"); + if (nodelist != NULL) + xfree(nodelist); + return; +} + +/* + * Forward the node failure message to the main srun process. + * + * NOTE: this is called from the forked message handling process + */ +static void _node_fail_forwarder(char *nodelist, srun_job_t *job) +{ + pipe_enum_t pipe_enum = PIPE_NODE_FAIL; + int dummy = 0xdeadbeef; + int pipe_fd = job->forked_msg->par_msg->msg_pipe[1]; + int len; + + len = strlen(nodelist); + if (message_thread) { + safe_write(pipe_fd, &pipe_enum, sizeof(int)); + safe_write(pipe_fd, &dummy, sizeof(int)); + + /* the following writes are handled by _node_fail_handler */ + safe_write(pipe_fd, &len, sizeof(int)); + safe_write(pipe_fd, nodelist, len); + } + return; +rwfail: + error("Failure sending node failure message to main process: %m"); + return; } static bool _job_msg_done(srun_job_t *job) @@ -862,7 +944,7 @@ _handle_msg(srun_job_t *job, slurm_msg_t *msg) case SRUN_NODE_FAIL: verbose("node_fail received"); nf = msg->data; - _node_fail_handler(nf->nodelist, job); + _node_fail_forwarder(nf->nodelist, job); slurm_free_srun_node_fail_msg(msg->data); break; case RESPONSE_RESOURCE_ALLOCATION: @@ -1178,6 +1260,9 @@ par_thr(void *arg) _handle_update_step_layout(par_msg->msg_pipe[0], job->step_layout); break; + case PIPE_NODE_FAIL: + _node_fail_handler(par_msg->msg_pipe[0], job); + break; default: error("Unrecognized message from message thread %d", type);