Skip to content
Snippets Groups Projects
Commit 2768d28a authored by Christopher J. Morrone's avatar Christopher J. Morrone
Browse files

svn merge -r9609:9622 https://eris.llnl.gov/svn/slurm/branches/slurm-1.1

Better handling of srun node-fail notification.
parent df9bde9b
No related branches found
No related tags found
No related merge requests found
...@@ -82,7 +82,8 @@ typedef enum { ...@@ -82,7 +82,8 @@ typedef enum {
PIPE_SIGNALED, PIPE_SIGNALED,
PIPE_MPIR_DEBUG_STATE, PIPE_MPIR_DEBUG_STATE,
PIPE_UPDATE_MPIR_PROCTABLE, PIPE_UPDATE_MPIR_PROCTABLE,
PIPE_UPDATE_STEP_LAYOUT PIPE_UPDATE_STEP_LAYOUT,
PIPE_NODE_FAIL
} pipe_enum_t; } pipe_enum_t;
/* For Message thread */ /* For Message thread */
......
...@@ -103,7 +103,8 @@ static void _msg_thr_poll(srun_job_t *job); ...@@ -103,7 +103,8 @@ static void _msg_thr_poll(srun_job_t *job);
static void _set_jfds_nonblocking(srun_job_t *job); static void _set_jfds_nonblocking(srun_job_t *job);
static void _print_pid_list(const char *host, int ntasks, static void _print_pid_list(const char *host, int ntasks,
uint32_t *pid, char *executable_name); uint32_t *pid, char *executable_name);
static void _node_fail_handler(char *nodelist, srun_job_t *job); static void _node_fail_handler(int fd, srun_job_t *job);
static void _node_fail_forwarder(char *nodelist, srun_job_t *job);
#define _poll_set_rd(_pfd, _fd) do { \ #define _poll_set_rd(_pfd, _fd) do { \
(_pfd).fd = _fd; \ (_pfd).fd = _fd; \
...@@ -341,17 +342,98 @@ void timeout_handler(time_t timeout) ...@@ -341,17 +342,98 @@ void timeout_handler(time_t timeout)
* not. The job will continue to execute given the --no-kill option. * not. The job will continue to execute given the --no-kill option.
* Otherwise all of the job's tasks and the job itself are killed.. * Otherwise all of the job's tasks and the job itself are killed..
*/ */
static void _node_fail_handler(char *nodelist, srun_job_t *job) static void _node_fail_handler(int fd, srun_job_t *job)
{ {
if ( (opt.no_kill) ) { char *nodelist = NULL;
error("Node failure on %s, eliminated that node", nodelist); int len = 0;
return; hostset_t fail_nodes, all_nodes;
hostlist_iterator_t fail_itr;
char *node;
int num_node_ids;
int *node_ids;
int i, j;
int node_id, num_tasks;
/* get the hostlist string of failed nodes from the message thread */
safe_read(fd, &len, sizeof(int));
nodelist = (char *)xmalloc(len+1);
safe_read(fd, nodelist, len);
nodelist[len] = '\0';
/* now process the down nodes and tell the IO client about them */
fail_nodes = hostset_create(nodelist);
fail_itr = hostset_iterator_create(fail_nodes);
num_node_ids = hostset_count(fail_nodes);
node_ids = xmalloc(sizeof(int) * num_node_ids);
all_nodes = hostset_create(job->step_layout->node_list);
/* find the index number of each down node */
slurm_mutex_lock(&job->task_mutex);
for (i = 0; i < num_node_ids; i++) {
node = hostlist_next(fail_itr);
node_id = node_ids[i] = hostset_find(all_nodes, node);
if (job->host_state[node_id] != SRUN_HOST_UNREACHABLE) {
error("Node failure: %s.", node);
job->host_state[node_id] = SRUN_HOST_UNREACHABLE;
}
free(node);
/* find all of the tasks that should run on this failed node
* and mark them as having failed.
*/
num_tasks = job->step_layout->tasks[node_id];
for (j = 0; j < num_tasks; j++) {
int gtaskid;
debug2("marking task %d done on failed node %d",
job->step_layout->tids[node_id][j], node_id);
gtaskid = job->step_layout->tids[node_id][j];
job->task_state[gtaskid] = SRUN_TASK_FAILED;
}
} }
slurm_mutex_unlock(&job->task_mutex);
error("Node failure on %s, killing job", nodelist); client_io_handler_downnodes(job->client_io, node_ids, num_node_ids);
update_job_state(job, SRUN_JOB_FORCETERM);
info("sending Ctrl-C to remaining tasks"); if (!opt.no_kill) {
fwd_signal(job, SIGINT, opt.max_threads); update_job_state(job, SRUN_JOB_FORCETERM);
info("sending SIGINT to remaining tasks");
fwd_signal(job, SIGINT, opt.max_threads);
}
xfree(nodelist);
return;
rwfail:
error("Failure reading node failure message from message process: %m");
if (nodelist != NULL)
xfree(nodelist);
return;
}
/*
* Forward the node failure message to the main srun process.
*
* NOTE: this is called from the forked message handling process
*/
static void _node_fail_forwarder(char *nodelist, srun_job_t *job)
{
pipe_enum_t pipe_enum = PIPE_NODE_FAIL;
int dummy = 0xdeadbeef;
int pipe_fd = job->forked_msg->par_msg->msg_pipe[1];
int len;
len = strlen(nodelist);
if (message_thread) {
safe_write(pipe_fd, &pipe_enum, sizeof(int));
safe_write(pipe_fd, &dummy, sizeof(int));
/* the following writes are handled by _node_fail_handler */
safe_write(pipe_fd, &len, sizeof(int));
safe_write(pipe_fd, nodelist, len);
}
return;
rwfail:
error("Failure sending node failure message to main process: %m");
return;
} }
static bool _job_msg_done(srun_job_t *job) static bool _job_msg_done(srun_job_t *job)
...@@ -862,7 +944,7 @@ _handle_msg(srun_job_t *job, slurm_msg_t *msg) ...@@ -862,7 +944,7 @@ _handle_msg(srun_job_t *job, slurm_msg_t *msg)
case SRUN_NODE_FAIL: case SRUN_NODE_FAIL:
verbose("node_fail received"); verbose("node_fail received");
nf = msg->data; nf = msg->data;
_node_fail_handler(nf->nodelist, job); _node_fail_forwarder(nf->nodelist, job);
slurm_free_srun_node_fail_msg(msg->data); slurm_free_srun_node_fail_msg(msg->data);
break; break;
case RESPONSE_RESOURCE_ALLOCATION: case RESPONSE_RESOURCE_ALLOCATION:
...@@ -1178,6 +1260,9 @@ par_thr(void *arg) ...@@ -1178,6 +1260,9 @@ par_thr(void *arg)
_handle_update_step_layout(par_msg->msg_pipe[0], _handle_update_step_layout(par_msg->msg_pipe[0],
job->step_layout); job->step_layout);
break; break;
case PIPE_NODE_FAIL:
_node_fail_handler(par_msg->msg_pipe[0], job);
break;
default: default:
error("Unrecognized message from message thread %d", error("Unrecognized message from message thread %d",
type); type);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment