diff --git a/src/srun/io.c b/src/srun/io.c index ab2073820d0deeda893a06cbb0cb47a2274e3f92..6c6e38387447d2c412d9165952ba30353db00728 100644 --- a/src/srun/io.c +++ b/src/srun/io.c @@ -64,16 +64,16 @@ typedef struct fd_info { time_t time_last_io; -static void _accept_io_stream(job_t *job, int i); -static int _do_task_output_poll(fd_info_t *info); -static int _handle_pollerr(fd_info_t *info); -static int _close_stream(int *fd, FILE *out, int tasknum); -static int _do_task_output(int *fd, FILE *out, int tasknum); -static void _bcast_stdin(int fd, job_t *job); -static ssize_t _readx(int fd, char *buf, size_t maxbytes); -static ssize_t _readn(int fd, void *buf, size_t nbytes); -static char * _next_line(char **str); -static int _validate_header(slurm_io_stream_header_t *hdr, job_t *job); +static void _accept_io_stream(job_t *job, int i); +static void _bcast_stdin(int fd, job_t *job); +static int _close_stream(int *fd, FILE *out, int tasknum); +static int _do_task_output(int *fd, FILE *out, int tasknum); +static int _do_task_output_poll(fd_info_t *info); +static int _handle_pollerr(fd_info_t *info); +static char * _next_line(char **str); +static ssize_t _readn(int fd, void *buf, size_t nbytes); +static ssize_t _readx(int fd, char *buf, size_t maxbytes); +static int _validate_header(slurm_io_stream_header_t *hdr, job_t *job); #define _poll_set_rd(_pfd, _fd) do { \ (_pfd).fd = _fd; \ @@ -129,7 +129,7 @@ _io_thr_poll(void *job_arg) int numfds = (opt.nprocs*2) + job->niofds + 2; fd_info_t map[numfds]; /* map fd in pollfd array to fd info */ int i, rc; - bool no_io_msg_sent = false; + static bool no_io_msg_sent = false; xassert(job != NULL); @@ -149,6 +149,7 @@ _io_thr_poll(void *job_arg) _poll_set_rd(fds[i], job->iofd[i]); } _poll_set_rd(fds[i], STDIN_FILENO); + time_last_io = time(NULL); while (1) { int eofcnt = 0; @@ -171,14 +172,14 @@ _io_thr_poll(void *job_arg) nfds++; } - if ((job->out[i] == IO_DONE) - && (job->err[i] == IO_DONE)) + if ((job->out[i] == IO_DONE) && + (job->err[i] == IO_DONE)) eofcnt++; } debug3("eofcnt == %d", eofcnt); - /* exit if we have received eof on all streams */ + /* exit if we have received EOF on all streams */ if (eofcnt == opt.nprocs) pthread_exit(0); @@ -188,13 +189,13 @@ _io_thr_poll(void *job_arg) if (job->state == SRUN_JOB_FAILED) pthread_exit(0); else if (no_io_msg_sent) - continue; + ; else if (i > MAX_IO_WAIT_SEC) { info("Warning: No I/O in %d seconds", MAX_IO_WAIT_SEC); no_io_msg_sent = true; - continue; } + continue; } switch(errno) { @@ -225,13 +226,15 @@ _io_thr_poll(void *job_arg) if (_poll_rd_isset(fds[i++])) _bcast_stdin(STDIN_FILENO, job); - for (; i < nfds; i++) { + for ( ; i < nfds; i++) { unsigned short revents = fds[i].revents; xassert(!(revents & POLLNVAL)); - if (revents & POLLERR || revents & POLLHUP) + if ((revents & POLLERR) || + (revents & POLLHUP) || + (revents & POLLNVAL)) _handle_pollerr(&map[i]); - if (revents & POLLIN && *map[i].fd > 0) + if ((revents & POLLIN) && (*map[i].fd > 0)) _do_task_output_poll(&map[i]); } } @@ -293,7 +296,7 @@ _accept_io_stream(job_t *job, int i) return; } free_buf(buffer); /* NOTE: this frees msgbuf */ - if (_validate_header(&hdr, job)) + if (_validate_header(&hdr, job)) /* check key */ return; @@ -301,21 +304,18 @@ _accept_io_stream(job_t *job, int i) * sends along some control information */ - /* Do I even need this? */ - fd_set_nonblocking(sd); - if ((hdr.task_id < 0) || (hdr.task_id >= opt.nprocs)) { error ("Invalid task_id %d from %s", hdr.task_id, buf); continue; } + fd_set_nonblocking(sd); if (hdr.type == SLURM_IO_STREAM_INOUT) job->out[hdr.task_id] = sd; else job->err[hdr.task_id] = sd; - /* FIXME: Need to check key */ verbose("accepted %s connection from %s task %ld, sd=%d", (hdr.type ? "stderr" : "stdout"), buf, hdr.task_id, sd ); @@ -331,7 +331,7 @@ _close_stream(int *fd, FILE *out, int tasknum) out == stdout ? "stdout" : "stderr"); fflush(out); retval = shutdown(*fd, SHUT_RDWR); - if (retval >= 0 || errno != EBADF) + if ((retval >= 0) || (errno != EBADF)) close(*fd); *fd = IO_DONE; return retval; @@ -370,7 +370,8 @@ _readx(int fd, char *buf, size_t maxbytes) if ((n = read(fd, (void *) buf, maxbytes)) < 0) { if (errno == EINTR) goto again; - if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) + if ((errno == EAGAIN) || + (errno == EWOULDBLOCK)) return 0; error("readx fd %d: %m", fd, n); return -1; /* shutdown socket, cleanup. */ diff --git a/src/srun/job.c b/src/srun/job.c index 8fbff4fec25ced8039620b5be489d6d50f66fc94..ddd621ed62154102649efec0fdc4d1c63abf4682 100644 --- a/src/srun/job.c +++ b/src/srun/job.c @@ -78,7 +78,8 @@ job_create(resource_allocation_response_msg_t *resp) job->nhosts = hostlist_count(hl); job->host = (char **) xmalloc(job->nhosts * sizeof(char *)); - job->slurmd_addr = (slurm_addr *) xmalloc(job->nhosts * sizeof(slurm_addr)); + job->slurmd_addr = (slurm_addr *) xmalloc(job->nhosts * + sizeof(slurm_addr)); job->cpus = (int *) xmalloc(job->nhosts * sizeof(int) ); job->ntask = (int *) xmalloc(job->nhosts * sizeof(int) ); @@ -116,7 +117,8 @@ job_create(resource_allocation_response_msg_t *resp) pthread_mutex_init(&job->task_mutex, NULL); ntask = opt.nprocs; - tph = (ntask+job->nhosts-1) / job->nhosts; /* tasks per host, round up */ + tph = (ntask+job->nhosts-1) / + job->nhosts; /* tasks per host, round up */ for(i = 0; i < job->nhosts; i++) { job->host[i] = hostlist_shift(hl); @@ -133,7 +135,8 @@ job_create(resource_allocation_response_msg_t *resp) cpu_cnt = 0; } memcpy(&job->slurmd_addr[i], - &resp->node_addr[i], sizeof(job->slurmd_addr[i])); + &resp->node_addr[i], + sizeof(job->slurmd_addr[i])); } else { job->cpus[i] = tph; slurm_set_addr (&job->slurmd_addr[i], diff --git a/src/srun/msg.c b/src/srun/msg.c index 1404a0e15efccbca9341752321533a7b17fd372b..eed186b207571c092f379e5332703a41caf80775 100644 --- a/src/srun/msg.c +++ b/src/srun/msg.c @@ -33,8 +33,9 @@ #endif #include <errno.h> -#include <sys/poll.h> #include <fcntl.h> +#include <sys/poll.h> +#include <time.h> #include "src/common/fd.h" #include "src/common/log.h" @@ -52,9 +53,21 @@ #include "src/srun/attach.h" #endif +#define MAX_MSG_WAIT_SEC 60 /* max msg idle, secs, confirm launches */ +#define POLL_TIMEOUT_MSEC 500 +time_t time_last_msg; + static int tasks_exited = 0; static uint32_t slurm_user_id; +static void _accept_msg_connection(job_t *job, int fdnum); +static void _confirm_launch_complete(job_t *job); +static void _exit_handler(job_t *job, slurm_msg_t *exit_msg); +static void _handle_msg(job_t *job, slurm_msg_t *msg); +static void _launch_handler(job_t *job, slurm_msg_t *resp); +static void _msg_thr_poll(job_t *job); +static void _set_jfds_nonblocking(job_t *job); + #define _poll_set_rd(_pfd, _fd) do { \ (_pfd).fd = _fd; \ (_pfd).events = POLLIN; \ @@ -69,7 +82,6 @@ static uint32_t slurm_user_id; #define _poll_wr_isset(pfd) ((pfd).revents & POLLOUT) #define _poll_err(pfd) ((pfd).revents & POLLERR) -#define POLL_TIMEOUT_MSEC 500 static void _launch_handler(job_t *job, slurm_msg_t *resp) @@ -108,6 +120,24 @@ _launch_handler(job_t *job, slurm_msg_t *resp) } +/* _confirm_launch_complete + * confirm that all tasks registers a sucessful launch + * exit on failure */ +static void +_confirm_launch_complete(job_t *job) +{ + int i; + + for (i=0; i<job->nhosts; i++) { + if (job->host_state[i] != SRUN_HOST_REPLIED) { + error ("Node %s not responding, terminiating job step", + job->host[i]); + update_job_state(job, SRUN_JOB_FAILED); + pthread_exit(0); + } + } +} + static void _exit_handler(job_t *job, slurm_msg_t *exit_msg) { @@ -214,6 +244,7 @@ _msg_thr_poll(job_t *job) struct pollfd *fds; nfds_t nfds = job->njfds; int i, rc; + static bool check_launch_msg_sent = false; fds = xmalloc(job->njfds * sizeof(*fds)); @@ -221,14 +252,21 @@ _msg_thr_poll(job_t *job) for (i = 0; i < job->njfds; i++) _poll_set_rd(fds[i], job->jfd[i]); + time_last_msg = time(NULL); while (1) { while ((rc = poll(fds, nfds, POLL_TIMEOUT_MSEC)) <= 0) { if (rc == 0) { /* timeout */ + i = time(NULL)-time_last_msg; if (job->state == SRUN_JOB_FAILED) pthread_exit(0); - else - continue; + else if (check_launch_msg_sent) + ; + else if (i > MAX_MSG_WAIT_SEC) { + _confirm_launch_complete(job); + check_launch_msg_sent = true; + } + continue; } switch (errno) { @@ -243,9 +281,12 @@ _msg_thr_poll(job_t *job) } } + time_last_msg = time(NULL); for (i = 0; i < job->njfds; i++) { unsigned short revents = fds[i].revents; - if (revents & POLLERR) + if ((revents & POLLERR) || + (revents & POLLHUP) || + (revents & POLLNVAL)) error("poll error on jfd %d: %m", fds[i].fd); else if (revents & POLLIN) _accept_msg_connection(job, i); @@ -264,49 +305,3 @@ msg_thr(void *arg) _msg_thr_poll(job); return (void *)1; } - -void * -_msg_thr_one(void *arg) -{ - job_t *job = (job_t *) arg; - slurm_fd fd; - slurm_fd newfd; - slurm_msg_t *msg = NULL; - slurm_addr cli_addr; - char addrbuf[256]; - - xassert(job != NULL); - - pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); - pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL); - - fd = job->jfd[0]; - - while (1) { - - if ((newfd = slurm_accept_msg_conn(fd, &cli_addr)) < 0) { - error("_msg_thr_one/slurm_accept_msg_conn: %m"); - break; - } - - slurm_print_slurm_addr(&cli_addr, addrbuf, 256); - debug2("got message connection from %s", addrbuf); - - - msg = xmalloc(sizeof(*msg)); - if (slurm_receive_msg(newfd, msg) == SLURM_SOCKET_ERROR) { - error("_msg_thr_one/slurm_receive_msg: %m"); - slurm_close_accepted_conn(newfd); - break; - } - - msg->conn_fd = newfd; - _handle_msg(job, msg); - slurm_close_accepted_conn(newfd); - } - - /* reached only on receive error */ - return (void *)(0); -} - -