diff --git a/src/srun/io.c b/src/srun/io.c index d2deca91c21a59388da52393604e8c95f0e95c3a..6c184fc7d416f096dab53af5cacd6912c2ebcc78 100644 --- a/src/srun/io.c +++ b/src/srun/io.c @@ -21,6 +21,7 @@ #include "net.h" #define IO_BUFSIZ 2048 +#define IO_DONE -9 /* signify that eof has been recvd on stream */ static void _accept_io_stream(job_t *job); static int _handle_task_output(int *fd, FILE *out, int tasknum); @@ -48,6 +49,7 @@ void *io_thr(void *job_arg) } while (1) { + unsigned long eofcnt = 0; FD_ZERO(&rset); FD_ZERO(&wset); @@ -63,11 +65,17 @@ void *io_thr(void *job_arg) FD_SET(job->out[i], &rset); if (job->err[i] > 0) FD_SET(job->err[i], &rset); + if (job->out[i] == IO_DONE && job->err[i] == IO_DONE) + eofcnt++; } + /* exit if we have recieved eof on all streams */ + if (eofcnt == opt.nprocs) + pthread_exit(0); + tv.tv_sec = 0; tv.tv_usec = 500; - while ((m = select(maxfd+1, &rset, NULL, NULL, NULL)) < 0) { + while ((m = select(maxfd+1, &rset, NULL, NULL, &tv)) < 0) { if (errno != EINTR) fatal("Unable to handle I/O: %m", errno); } @@ -148,10 +156,10 @@ _handle_task_output(int *fd, FILE *out, int tasknum) if ((len = _readx(*fd, buf, IO_BUFSIZ)) <= 0) { debug("%d: <%s disconnected>", tasknum, out == stdout ? "stdout" : "stderr"); - fflush(stderr); + fflush(out); shutdown(*fd, SHUT_RDWR); close(*fd); - *fd = -1; + *fd = IO_DONE; } p = buf;