diff --git a/src/common/cbuf.c b/src/common/cbuf.c index 2b66a512312472ebc59440290a785bbebd233f09..9aca5a9bc4dbcc84a7a6bbaa6e875473b73b63c8 100644 --- a/src/common/cbuf.c +++ b/src/common/cbuf.c @@ -1,9 +1,9 @@ /***************************************************************************** * $Id$ ***************************************************************************** - * $LSDId: cbuf.c,v 1.30 2002/12/04 02:58:03 dun Exp $ + * $LSDId: cbuf.c,v 1.32 2003/01/03 21:08:19 dun Exp $ ***************************************************************************** - * Copyright (C) 2002 The Regents of the University of California. + * Copyright (C) 2002-2003 The Regents of the University of California. * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). * Written by Chris Dunlap <cdunlap@llnl.gov>. * @@ -121,8 +121,8 @@ typedef int (*cbuf_iof) (void *cbuf_data, void *arg, int len); * Prototypes * ****************/ -static int cbuf_find_replay_line (cbuf_t cb, int chars, int lines, int *nl); -static int cbuf_find_unread_line (cbuf_t cb, int chars, int lines); +static int cbuf_find_replay_line (cbuf_t cb, int chars, int *nlines, int *nl); +static int cbuf_find_unread_line (cbuf_t cb, int chars, int *nlines); static int cbuf_get_fd (void *dstbuf, int *psrcfd, int len); static int cbuf_get_mem (void *dstbuf, unsigned char **psrcbuf, int len); @@ -364,6 +364,20 @@ cbuf_used (cbuf_t cb) } +int +cbuf_lines_used (cbuf_t cb) +{ + int lines = -1; + + assert(cb != NULL); + cbuf_mutex_lock(cb); + assert(cbuf_is_valid(cb)); + cbuf_find_unread_line(cb, cb->size, &lines); + cbuf_mutex_unlock(cb); + return(lines); +} + + int cbuf_reused (cbuf_t cb) { @@ -378,6 +392,20 @@ cbuf_reused (cbuf_t cb) } +int +cbuf_lines_reused (cbuf_t cb) +{ + int lines = -1; + + assert(cb != NULL); + cbuf_mutex_lock(cb); + assert(cbuf_is_valid(cb)); + cbuf_find_replay_line(cb, cb->size, &lines, NULL); + cbuf_mutex_unlock(cb); + return(lines); +} + + int cbuf_is_empty (cbuf_t cb) { @@ -626,7 +654,7 @@ cbuf_drop_line (cbuf_t src, int len, int lines) cbuf_mutex_lock(src); assert(cbuf_is_valid(src)); - n = cbuf_find_unread_line(src, len, lines); + n = cbuf_find_unread_line(src, len, &lines); if (n > 0) { cbuf_dropper(src, n); } @@ -653,7 +681,7 @@ cbuf_peek_line (cbuf_t src, char *dstbuf, int len, int lines) } cbuf_mutex_lock(src); assert(cbuf_is_valid(src)); - n = cbuf_find_unread_line(src, len - 1, lines); + n = cbuf_find_unread_line(src, len - 1, &lines); if (n > 0) { if (len > 0) { m = MIN(n, len - 1); @@ -689,7 +717,7 @@ cbuf_read_line (cbuf_t src, char *dstbuf, int len, int lines) } cbuf_mutex_lock(src); assert(cbuf_is_valid(src)); - n = cbuf_find_unread_line(src, len - 1, lines); + n = cbuf_find_unread_line(src, len - 1, &lines); if (n > 0) { if (len > 0) { m = MIN(n, len - 1); @@ -727,7 +755,7 @@ cbuf_replay_line (cbuf_t src, char *dstbuf, int len, int lines) } cbuf_mutex_lock(src); assert(cbuf_is_valid(src)); - n = cbuf_find_replay_line(src, len - 1, lines, &nl); + n = cbuf_find_replay_line(src, len - 1, &lines, &nl); if (n > 0) { if (len > 0) { assert((nl == 0) || (nl == 1)); @@ -771,7 +799,7 @@ cbuf_rewind_line (cbuf_t src, int len, int lines) cbuf_mutex_lock(src); assert(cbuf_is_valid(src)); - n = cbuf_find_replay_line(src, len, lines, NULL); + n = cbuf_find_replay_line(src, len, &lines, NULL); if (n > 0) { src->used += n; src->i_out = (src->i_out - n + (src->size + 1)) % (src->size + 1); @@ -1079,24 +1107,31 @@ cbuf_move (cbuf_t src, cbuf_t dst, int len, int *ndropped) static int -cbuf_find_replay_line (cbuf_t cb, int chars, int lines, int *nl) +cbuf_find_replay_line (cbuf_t cb, int chars, int *nlines, int *nl) { /* Finds the specified number of lines from the replay region of the buffer. - * If ([lines] > 0), returns the number of bytes comprising the line count - * specified by [lines], or 0 if this number is not found (ie, all or none). - * If ([lines] == -1), returns the number of bytes comprising the maximum line - * count contained within the number of characters specified by [chars]. + * If ([nlines] > 0), returns the number of bytes comprising the line count, + * or 0 if this number of lines is not available (ie, all or none). + * If ([nlines] == -1), returns the number of bytes comprising the maximum + * line count bounded by the number of characters specified by [chars]. * Only complete lines (ie, those terminated by a newline) are counted, * with once exception: the most recent line of replay data is treated * as a complete line regardless of the presence of a terminating newline. + * Sets the value-result parameter [nlines] to the number of lines found. * Sets [nl] to '1' if a newline is required to terminate the replay data. */ - int i, n, m; + int i, n, m, l; + int lines; assert(cb != NULL); - assert(lines >= -1); + assert(nlines != NULL); + assert(*nlines >= -1); assert(cbuf_mutex_is_locked(cb)); + n = m = l = 0; + lines = *nlines; + *nlines = 0; + if (nl) { *nl = 0; /* init in case of early return */ } @@ -1123,10 +1158,12 @@ cbuf_find_replay_line (cbuf_t cb, int chars, int lines, int *nl) } --chars; } - else if (lines > 0) { - ++lines; + else { + if (lines > 0) { + ++lines; + } + --l; } - n = m = 0; i = cb->i_out; while (i != cb->i_rep) { i = (i + cb->size) % (cb->size + 1); /* (i - 1 + (S+1)) % (S+1) */ @@ -1141,6 +1178,7 @@ cbuf_find_replay_line (cbuf_t cb, int chars, int lines, int *nl) --lines; } m = n - 1; /* do not include preceding '\n' */ + ++l; } if ((chars == 0) || (lines == 0)) { break; @@ -1153,30 +1191,39 @@ cbuf_find_replay_line (cbuf_t cb, int chars, int lines, int *nl) --lines; } m = n; + ++l; } if (lines > 0) { return(0); /* all or none, and not enough found */ } + *nlines = l; return(m); } static int -cbuf_find_unread_line (cbuf_t cb, int chars, int lines) +cbuf_find_unread_line (cbuf_t cb, int chars, int *nlines) { /* Finds the specified number of lines from the unread region of the buffer. - * If ([lines] > 0), returns the number of bytes comprising the line count - * specified by [lines], or 0 if this number is not found (ie, all or none). - * If ([lines] == -1), returns the number of bytes comprising the maximum line - * count contained within the number of characters specified by [chars]. + * If ([nlines] > 0), returns the number of bytes comprising the line count, + * or 0 if this number of lines is not available (ie, all or none). + * If ([nlines] == -1), returns the number of bytes comprising the maximum + * line count bounded by the number of characters specified by [chars]. * Only complete lines (ie, those terminated by a newline) are counted. + * Sets the value-result parameter [nlines] to the number of lines found. */ - int i, n, m; + int i, n, m, l; + int lines; assert(cb != NULL); - assert(lines >= -1); + assert(nlines != NULL); + assert(*nlines >= -1); assert(cbuf_mutex_is_locked(cb)); + n = m = l = 0; + lines = *nlines; + *nlines = 0; + if ((lines == 0) || ((lines <= -1) && (chars <= 0))) { return(0); } @@ -1186,7 +1233,6 @@ cbuf_find_unread_line (cbuf_t cb, int chars, int lines) if (lines > 0) { chars = -1; /* chars parm not used if lines > 0 */ } - n = m = 0; i = cb->i_out; while (i != cb->i_in) { ++n; @@ -1198,6 +1244,7 @@ cbuf_find_unread_line (cbuf_t cb, int chars, int lines) --lines; } m = n; + ++l; } if ((chars == 0) || (lines == 0)) { break; @@ -1207,6 +1254,7 @@ cbuf_find_unread_line (cbuf_t cb, int chars, int lines) if (lines > 0) { return(0); /* all or none, and not enough found */ } + *nlines = l; return(m); } diff --git a/src/common/cbuf.h b/src/common/cbuf.h index 353088970c7f051610bdba91fcdb1f099f7d69ed..f2368ef2fa6bd9645d0997ef4a9dfc5e46af1858 100644 --- a/src/common/cbuf.h +++ b/src/common/cbuf.h @@ -1,9 +1,7 @@ /***************************************************************************** * $Id$ ***************************************************************************** - * $LSDId: cbuf.h,v 1.17 2002/12/03 20:34:03 dun Exp $ - ***************************************************************************** - * Copyright (C) 2002 The Regents of the University of California. + * Copyright (C) 2002-2003 The Regents of the University of California. * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). * Written by Chris Dunlap <cdunlap@llnl.gov>. * @@ -25,8 +23,8 @@ *****************************************************************************/ -#ifndef _LSD_CBUF_H -#define _LSD_CBUF_H +#ifndef LSD_CBUF_H +#define LSD_CBUF_H /*********** @@ -121,11 +119,21 @@ int cbuf_used (cbuf_t cb); * Returns the number of bytes in [cb] available for reading. */ +int cbuf_lines_used (cbuf_t cb); +/* + * Returns the number of lines in [cb] available for reading. + */ + int cbuf_reused (cbuf_t cb); /* * Returns the number of bytes in [cb] available for replaying/rewinding. */ +int cbuf_lines_reused (cbuf_t cb); +/* + * Returns the number of lines in [cb] available for replaying/rewinding. + */ + int cbuf_is_empty (cbuf_t cb); /* * Returns non-zero if [cb] is empty; o/w, returns zero. @@ -308,4 +316,4 @@ int cbuf_move (cbuf_t src, cbuf_t dst, int len, int *ndropped); */ -#endif /* !_LSD_CBUF_H */ +#endif /* !LSD_CBUF_H */ diff --git a/src/slurmd/io.c b/src/slurmd/io.c index 3c0bbb3144181c78ad5926d82dc0e7f5b6bf889a..373404e4e45f2ab215a8283a6bd050cf2bd29cd3 100644 --- a/src/slurmd/io.c +++ b/src/slurmd/io.c @@ -909,6 +909,8 @@ _io_obj(slurmd_job_t *job, task_info_t *t, int fd, int type) obj->ops = _ops_copy(&client_ops); io->buf = cbuf_create(1024, 1048576); io->writers = list_create(NULL); + + cbuf_opt_set(io->buf, CBUF_OPT_OVERWRITE, CBUF_WRAP_ONCE); break; case CLIENT_STDIN: obj->ops = _ops_copy(&client_ops); @@ -1192,7 +1194,7 @@ _write(io_obj_t *obj, List objs) if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) return 0; if ((errno == EPIPE) || (errno == EINVAL) || (errno == EBADF)) - _obj_close(obj, objs); + _obj_close(obj, objs); else error("write failed: <task %d>: %m", io->id); return -1; diff --git a/src/srun/io.c b/src/srun/io.c index f67956c0569860e2875fae8408bfee94e736cb25..0952f647df18961b6d0f1b351dee77b437ca1a54 100644 --- a/src/srun/io.c +++ b/src/srun/io.c @@ -106,6 +106,8 @@ static int _validate_header(slurm_io_stream_header_t *hdr, job_t *job); */ static bool stdin_got_eof = false; static bool stdin_open = true; +static uint32_t nbytes = 0; +static uint32_t nwritten = 0; static int _do_task_output_poll(fd_info_t *info) @@ -164,16 +166,33 @@ _update_task_state(job_t *job, int taskid) static void _do_output(cbuf_t buf, FILE *out, int tasknum) { - int len; + int len = 0; + int tot = 0; char line[4096]; while ((len = cbuf_read_line(buf, line, sizeof(line), 1))) { + int n = len; + if (opt.labelio) fprintf(out, "%0*d: ", fmt_width, tasknum); - fputs(line, out); - fflush(out); + + if ((n -= fprintf(out, "%s", line)) != 0) { + error("Need to rewind %d bytes", n); + cbuf_rewind(buf, n); + } + + tot += (len - n); } + /* if (fflush(out) == EOF) + error ("fflush: %m"); + */ + + debug3("Wrote %d bytes output. %d still buffered", + tot, cbuf_used(buf)); + + nwritten += tot; + } static void @@ -199,6 +218,9 @@ _flush_io(job_t *job) if (job->err[i] != IO_DONE) _close_stream(&job->err[i], stderr, i); } + + fclose(job->outstream); + debug3("Read %dB from tasks, wrote %dB", nbytes, nwritten); } static void * @@ -291,9 +313,12 @@ _io_thr_poll(void *job_arg) /* exit if we have received EOF on all streams */ if (eofcnt) { if (eofcnt == opt.nprocs) { + debug("got EOF on all streams"); _flush_io(job); pthread_exit(0); - } if (time_first_done == 0) + } + + if (time_first_done == 0) time_first_done = time(NULL); } @@ -324,6 +349,8 @@ _io_thr_poll(void *job_arg) } } + debug3("poll returned with rc = %d", rc); + for (i = 0; i < job->niofds; i++) { if (fds[i].revents) { if (_poll_err(fds[i])) @@ -375,6 +402,7 @@ static void _do_poll_timeout (job_t *job) } if (eofcnt == opt.nprocs) { + debug("In poll_timeout(): EOF on all streams"); _flush_io(job); pthread_exit(0); } @@ -532,6 +560,13 @@ open_streams(job_t *job) if (!job->outstream || !job->errstream || (job->stdinfd < 0)) return -1; + /* + * Turn off buffering of output stream, since we're doing it + * with our own buffers. (Also, stdio buffering seems to + * causing some problems with loss of output) + */ + setvbuf(job->outstream, NULL, _IONBF, 0); + return 0; } @@ -677,7 +712,6 @@ _close_stream(int *fd, FILE *out, int tasknum) int retval; debug2("%d: <%s disconnected>", tasknum, out == stdout ? "stdout" : "stderr"); - fflush(out); retval = shutdown(*fd, SHUT_RDWR); if ((retval >= 0) || (errno != EBADF)) close(*fd); @@ -688,23 +722,23 @@ _close_stream(int *fd, FILE *out, int tasknum) static int _do_task_output(int *fd, FILE *out, cbuf_t buf, int tasknum) { - char line[IO_BUFSIZ]; int len = 0; int dropped = 0; if ((len = cbuf_write_from_fd(buf, *fd, -1, &dropped)) <= 0) { - if ((len != 0)) + if (len < 0) error("Error task %d IO: %m", tasknum); _close_stream(fd, out, tasknum); return len; } - while ((len = cbuf_read_line(buf, line, sizeof(line), 1))) { - if (opt.labelio) - fprintf(out, "%0*d: ", fmt_width, tasknum); - fputs(line, out); - fflush(out); - } + debug3("Wrote %d bytes into buffer of size %d", len, cbuf_used(buf)); + if (dropped) + debug3("dropped %d bytes", dropped); + + nbytes += len; + + _do_output(buf, out, tasknum); return len; } diff --git a/src/srun/job.c b/src/srun/job.c index b5650dc3122a50a13a4fb75e58dda2b2a272452c..6e7f212bfa4060f33250e1fdd322b81232a5bdf4 100644 --- a/src/srun/job.c +++ b/src/srun/job.c @@ -1,5 +1,6 @@ /****************************************************************************\ * job.c - job data structure createion functions + * $Id$ ***************************************************************************** * Copyright (C) 2002 The Regents of the University of California. * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). @@ -161,11 +162,14 @@ _job_create_internal(allocation_info_t *info) for (i = 0; i < opt.nprocs; i++) { job->task_state[i] = SRUN_TASK_INIT; - job->outbuf[i] = cbuf_create(1024, 1048576); - job->errbuf[i] = cbuf_create(1024, 1048576); + job->outbuf[i] = cbuf_create(4096, 1048576); + job->errbuf[i] = cbuf_create(4096, 1048576); job->inbuf[i] = cbuf_create(4096, 4096); - cbuf_opt_set(job->inbuf[i], CBUF_OPT_OVERWRITE, 0); + + cbuf_opt_set(job->outbuf[i], CBUF_OPT_OVERWRITE, CBUF_NO_DROP); + cbuf_opt_set(job->errbuf[i], CBUF_OPT_OVERWRITE, CBUF_NO_DROP); + cbuf_opt_set(job->inbuf[i], CBUF_OPT_OVERWRITE, CBUF_NO_DROP); job->stdin_eof[i] = false; } diff --git a/src/srun/srun.c b/src/srun/srun.c index 3d500e2121df98d29f5cfea3b6d525c813d8bb2b..ffb2da72f270b9d7c36735372e024ccd77a2bbfc 100644 --- a/src/srun/srun.c +++ b/src/srun/srun.c @@ -210,22 +210,24 @@ srun(int ac, char **av) fwd_signal(job, SIGINT); } - /* Tell slurmctld that job is done */ - job_destroy(job, 0); /* wait for launch thread */ if (pthread_join(job->lid, NULL) < 0) error ("Waiting on launch thread: %m"); + debug("Waiting for IO thread"); /* wait for stdio */ if (pthread_join(job->ioid, NULL) < 0) error ("Waiting on IO: %m"); + /* Tell slurmctld that job is done */ + job_destroy(job, 0); + /* kill msg server thread */ - pthread_kill(job->jtid, SIGHUP); + /*pthread_kill(job->jtid, SIGHUP);*/ /* kill signal thread */ - pthread_kill(job->sigid, SIGHUP); + /*pthread_kill(job->sigid, SIGHUP);*/ log_fini(); return 0;