diff --git a/src/srun/io.c b/src/srun/io.c index 409bf1a26833d71b60c263f09ca065a446de2707..ea86b82b6a0c4ffa42b9ae9f54904239b7080f52 100644 --- a/src/srun/io.c +++ b/src/srun/io.c @@ -61,6 +61,7 @@ typedef struct fd_info { int taskid; /* corresponding task id */ int *fd; /* pointer to fd in job->out/err array */ FILE *fp; /* fp on which to write output */ + cbuf_t buf; } fd_info_t; static time_t time_first_done = 0; @@ -71,11 +72,10 @@ static void _accept_io_stream(job_t *job, int i); static void _bcast_stdin(int fd, job_t *job); static int _close_stream(int *fd, FILE *out, int tasknum); static void _do_poll_timeout(job_t *job); -static int _do_task_output(int *fd, FILE *out, int tasknum); +static int _do_task_output(int *fd, FILE *out, cbuf_t buf, int tasknum); static int _do_task_output_poll(fd_info_t *info); static int _handle_pollerr(fd_info_t *info); static char * _host_state_name(host_state_t state_inx); -static char * _next_line(char **str); static ssize_t _readn(int fd, void *buf, size_t nbytes); static ssize_t _readx(int fd, char *buf, size_t maxbytes); static char * _task_state_name(task_state_t state_inx); @@ -98,7 +98,7 @@ static int _validate_header(slurm_io_stream_header_t *hdr, job_t *job); static int _do_task_output_poll(fd_info_t *info) { - return _do_task_output(info->fd, info->fp, info->taskid); + return _do_task_output(info->fd, info->fp, info->buf, info->taskid); } static int @@ -166,6 +166,7 @@ _io_thr_poll(void *job_arg) map[nfds].taskid = i; map[nfds].fd = &job->out[i]; map[nfds].fp = stdout; + map[nfds].buf = job->outbuf[i]; nfds++; } @@ -174,6 +175,7 @@ _io_thr_poll(void *job_arg) map[nfds].taskid = i; map[nfds].fd = &job->err[i]; map[nfds].fp = stderr; + map[nfds].buf = job->errbuf[i]; nfds++; } @@ -193,12 +195,11 @@ _io_thr_poll(void *job_arg) time_first_done = time(NULL); } - while ((rc = poll(fds, nfds, POLL_TIMEOUT_MSEC)) <= 0) { + while ((rc = poll(fds, nfds, POLL_TIMEOUT_MSEC)) < 0) { if (rc == 0) { /* timeout */ _do_poll_timeout(job); continue; } - switch(errno) { case EINTR: case EAGAIN: @@ -231,8 +232,8 @@ _io_thr_poll(void *job_arg) unsigned short revents = fds[i].revents; xassert(!(revents & POLLNVAL)); if ((revents & POLLERR) || - (revents & POLLHUP) || - (revents & POLLNVAL)) + (revents & POLLHUP) || + (revents & POLLNVAL)) _handle_pollerr(&map[i]); if ((revents & POLLIN) && (*map[i].fd > 0)) @@ -353,6 +354,30 @@ io_thr(void *arg) return _io_thr_poll(arg); } +int +io_thr_create(job_t *job) +{ + int i; + pthread_attr_t attr; + + for (i = 0; i < job->niofds; i++) { + if (net_stream_listen(&job->iofd[i], &job->ioport[i]) < 0) + fatal("unable to initialize stdio server port: %m"); + debug("initialized stdio server port %d\n", + ntohs(job->ioport[i])); + net_set_low_water(job->iofd[i], 140); + } + + pthread_attr_init(&attr); + if ((errno = pthread_create(&job->ioid, &attr, &io_thr, (void *) job))) + fatal("Unable to create io thread: %m"); + + debug("Started IO server thread (%d)", job->ioid); + + return SLURM_SUCCESS; +} + + static void _accept_io_stream(job_t *job, int i) { @@ -409,18 +434,15 @@ _accept_io_stream(job_t *job, int i) /* Assign new fds arbitrarily for now, until slurmd * sends along some control information */ - + if ((hdr.task_id < 0) || (hdr.task_id >= opt.nprocs)) { - error ("Invalid task_id %d from %s", - hdr.task_id, buf); + error ("Invalid task_id %d from %s", hdr.task_id, buf); continue; - } else { - pthread_mutex_lock(&job->task_mutex); - if (job->task_state[hdr.task_id] == SRUN_TASK_INIT) - job->task_state[hdr.task_id] = - SRUN_TASK_RUNNING; - pthread_mutex_unlock(&job->task_mutex); - } + } + + slurm_mutex_lock(&job->task_mutex); + job->task_state[hdr.task_id] = SRUN_TASK_RUNNING; + slurm_mutex_unlock(&job->task_mutex); fd_set_nonblocking(sd); if (hdr.type == SLURM_IO_STREAM_INOUT) @@ -450,23 +472,21 @@ _close_stream(int *fd, FILE *out, int tasknum) } static int -_do_task_output(int *fd, FILE *out, int tasknum) +_do_task_output(int *fd, FILE *out, cbuf_t buf, int tasknum) { - char buf[IO_BUFSIZ+1]; - char *line, *p; + char line[IO_BUFSIZ]; int len = 0; + int dropped = 0; - if ((len = _readx(*fd, buf, IO_BUFSIZ)) <= 0) { + if ((len = cbuf_write_from_fd(buf, *fd, -1, &dropped)) <= 0) { _close_stream(fd, out, tasknum); return len; } - buf[IO_BUFSIZ] = '\0'; - p = buf; - while ((line = _next_line(&p)) != NULL) { + while ((len = cbuf_read_line(buf, line, sizeof(line), 1))) { if (opt.labelio) fprintf(out, "%d: ", tasknum); - fprintf(out, "%s\n", line); + fputs(line, out); fflush(out); } @@ -495,31 +515,6 @@ _readx(int fd, char *buf, size_t maxbytes) } -/* return a pointer to the beginning of the next line in *str and - * nullify newline character. *str will be pointed just past - * nullified '\n' - */ -static char * -_next_line(char **str) -{ - char *p, *line; - xassert(*str != NULL); - - if (**str == '\0') - return NULL; - - line = *str; - if ((p = strchr(*str, '\n')) != NULL) { - *p = '\0'; - *str = p+1; - } else { - *str += strlen(*str) - 1; - **str = '\0'; - } - - return line; -} - ssize_t _readn(int fd, void *buf, size_t nbytes) { diff --git a/src/srun/job.c b/src/srun/job.c index 0e56d213c157947d05cb770bed73cbbd4ada53e8..0ee8af349f2ccad55d5c81d806da54741c206506 100644 --- a/src/srun/job.c +++ b/src/srun/job.c @@ -33,6 +33,7 @@ #include "src/common/slurm_protocol_api.h" #include "src/common/xmalloc.h" #include "src/common/xstring.h" +#include "src/common/cbuf.h" #include "src/srun/job.h" #include "src/srun/opt.h" @@ -102,8 +103,16 @@ job_create(resource_allocation_response_msg_t *resp) job->ioport = (int *) xmalloc(job->niofds * sizeof(int)); /* ntask stdout and stderr fds */ - job->out = (int *) xmalloc(opt.nprocs * sizeof(int)); - job->err = (int *) xmalloc(opt.nprocs * sizeof(int)); + job->out = (int *) xmalloc(opt.nprocs * sizeof(int)); + job->err = (int *) xmalloc(opt.nprocs * sizeof(int)); + + /* ntask cbufs for stdout and stderr */ + job->outbuf = (cbuf_t *) xmalloc(opt.nprocs * sizeof(cbuf_t)); + job->errbuf = (cbuf_t *) xmalloc(opt.nprocs * sizeof(cbuf_t)); + for (i = 0; i < opt.nprocs; i++) { + job->outbuf[i] = cbuf_create(1024, 10240); + job->errbuf[i] = cbuf_create(1024, 10240); + } /* nhost host states */ job->host_state = diff --git a/src/srun/job.h b/src/srun/job.h index 0045aee900675de76a20ed400eeb017d7fc2cd19..b2c72a6b088ec75f34b786b0bcef48c616cc9f93 100644 --- a/src/srun/job.h +++ b/src/srun/job.h @@ -10,6 +10,7 @@ #include <netinet/in.h> #include "src/common/macros.h" +#include "src/common/cbuf.h" #include "src/api/slurm.h" typedef enum { @@ -68,6 +69,12 @@ typedef struct srun_job { int *out; /* ntask stdout fds */ int *err; /* ntask stderr fds */ + /* XXX Need long term solution here: + * Quickfix: ntask*2 cbufs for buffering job output + */ + cbuf_t *outbuf; + cbuf_t *errbuf; + pthread_t lid; /* launch thread id */ host_state_t *host_state; /* nhost host states */