From e65a603ec6241b4d2a74afa8dfbd48fb70f9be55 Mon Sep 17 00:00:00 2001 From: Mark Grondona <mgrondona@llnl.gov> Date: Wed, 18 Dec 2002 00:45:04 +0000 Subject: [PATCH] o more IO fixes. Use constant size buffer for stdin. slurmd/fname.[ch] : Pre-open and truncate all unique output filenames slurmd/io.c : make call to truncate all output files. --- src/slurmd/fname.c | 65 ++++++++++++++++++++++++++++++++++++++++++++++ src/slurmd/fname.h | 1 + src/slurmd/io.c | 46 ++++++++++++++++++++------------ src/srun/io.c | 13 +++++++--- src/srun/job.c | 2 +- 5 files changed, 106 insertions(+), 21 deletions(-) diff --git a/src/slurmd/fname.c b/src/slurmd/fname.c index 55d08a31cfc..9d6bc9d1cf1 100644 --- a/src/slurmd/fname.c +++ b/src/slurmd/fname.c @@ -28,6 +28,9 @@ #include <stdlib.h> #include <string.h> #include <ctype.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> #include "src/slurmd/job.h" #include "src/slurmd/fname.h" @@ -116,3 +119,65 @@ fname_create(slurmd_job_t *job, const char *format, int taskid) return name; } +static int +find_fname(void *obj, void *key) +{ + char *str = obj; + char *name = key; + + if (strcmp(str, name) == 0) + return 1; + return 0; +} + +static int +_trunc_file(char *path) +{ + int flags = O_CREAT|O_TRUNC|O_WRONLY; + int fd; + + do { + fd = open(path, flags, 0644); + } while ((fd < 0) && (errno == EINTR)); + + if (fd < 0) { + error("Unable to open `%s': %m", path); + return -1; + } else + debug3("opened and truncated `%s'", path); + + if (close(fd) < 0) + error("Unable to close `%s': %m", path); + + return 0; +} + +static void +fname_free(void *name) +{ + xfree(name); +} +int +fname_trunc_all(slurmd_job_t *job, const char *fmt) +{ + int i, rc; + char *fname; + ListIterator filei; + List files = list_create((ListDelF)fname_free); + + for (i = 0; i < job->ntasks; i++) { + fname = fname_create(job, fmt, job->task[i]->gid); + if (!list_find_first(files, (ListFindF) find_fname, fname)) + list_append(files, (void *)fname); + } + + filei = list_iterator_create(files); + while ((fname = list_next(filei))) { + if ((rc = _trunc_file(fname)) < 0) + goto done; + } + + done: + list_destroy(files); + return rc; +} diff --git a/src/slurmd/fname.h b/src/slurmd/fname.h index 65148796af5..eaf2b41f55e 100644 --- a/src/slurmd/fname.h +++ b/src/slurmd/fname.h @@ -30,5 +30,6 @@ #include "src/slurmd/job.h" char *fname_create(slurmd_job_t *job, const char *fmt, int taskid); +int fname_trunc_all(slurmd_job_t *job, const char *fmt); #endif /* !_SLURMD_FNAME_H */ diff --git a/src/slurmd/io.c b/src/slurmd/io.c index 19541665c7a..2f980ce337c 100644 --- a/src/slurmd/io.c +++ b/src/slurmd/io.c @@ -486,13 +486,17 @@ _io_prepare_clients(slurmd_job_t *job) srun = list_peek(job->sruns); xassert(srun != NULL); + if (srun->ofname && (fname_trunc_all(job, srun->ofname) < 0)) + return SLURM_FAILURE; + + if (srun->efname && (fname_trunc_all(job, srun->efname) < 0)) + return SLURM_FAILURE; + if (srun->ioaddr.sin_addr.s_addr) { char host[256]; short port; slurm_get_addr(&srun->ioaddr, &port, host, sizeof(host)); - if (port) - debug2("connecting IO back to %s:%d", - host, ntohs(port)); + debug2("connecting IO back to %s:%d", host, ntohs(port)); } /* Connect stdin/out/err to either a remote srun or @@ -533,10 +537,10 @@ static int _open_output_file(slurmd_job_t *job, task_info_t *t, char *fmt, slurmd_io_type_t type) { - int fd = -1; - io_obj_t *obj = NULL; - int flags = O_CREAT|O_TRUNC|O_APPEND|O_WRONLY; - char *fname = fname_create(job, fmt, t->gid); + int fd = -1; + io_obj_t *obj = NULL; + int flags = O_APPEND|O_WRONLY; + char *fname = fname_create(job, fmt, t->gid); xassert((type == CLIENT_STDOUT) || (type == CLIENT_STDERR)); @@ -1328,14 +1332,21 @@ _client_read(io_obj_t *obj, List objs) xassert(_validate_io_list(objs)); xassert(_isa_client(client)); - i = list_iterator_create(client->readers); - while ((reader = list_next(i))) { - if (cbuf_free(reader->buf) < len) - len = cbuf_free(reader->buf); - } + /* + * Determine the maximum amount of data we will + * safely be able to read + */ + if (client->readers) { + i = list_iterator_create(client->readers); + while ((reader = list_next(i))) { + if (cbuf_free(reader->buf) < len) + len = cbuf_free(reader->buf); + } + list_iterator_destroy(i); - if (len == 0) - return 0; + if (len == 0) + return 0; + } again: @@ -1391,11 +1402,12 @@ _client_read(io_obj_t *obj, List objs) /* * Copy cbuf to all readers */ - list_iterator_reset(i); + i = list_iterator_create(client->readers); while((reader = list_next(i))) { n = cbuf_write(reader->buf, (void *) buf, n, &dropped); - error ("Dropped %d bytes stdin data to task %d", - dropped, client->id); + if (dropped > 0) + error("Dropped %d bytes stdin data to task %d", + dropped, client->id); } list_iterator_destroy(i); diff --git a/src/srun/io.c b/src/srun/io.c index f20b6a888d9..0c30ac2a4dc 100644 --- a/src/srun/io.c +++ b/src/srun/io.c @@ -95,6 +95,7 @@ static int _validate_header(slurm_io_stream_header_t *hdr, job_t *job); #define _poll_rd_isset(pfd) ((pfd).revents & POLLIN ) #define _poll_wr_isset(pfd) ((pfd).revents & POLLOUT) #define _poll_err(pfd) ((pfd).revents & POLLERR) +#define _poll_hup(pfd) ((pfd).revents & POLLHUP) /* True if an EOF needs to be broadcast to all tasks */ @@ -309,15 +310,21 @@ _io_thr_poll(void *job_arg) for (i = 0; i < job->niofds; i++) { if (fds[i].revents) { - if (fds[i].revents & POLLERR) + if (_poll_err(fds[i])) error("poll error on io fd %d", i); else _accept_io_stream(job, i); } } - if ((job->stdinfd >= 0) && _poll_rd_isset(fds[i++])) - _bcast_stdin(job->stdinfd, job); + if ((job->stdinfd >= 0)) { + if (_poll_rd_isset(fds[i])) + _bcast_stdin(job->stdinfd, job); + if (_poll_hup(fds[i]) || _poll_err(fds[i])) + _bcast_stdin(job->stdinfd, job); + ++i; + } + for ( ; i < nfds; i++) { unsigned short revents = fds[i].revents; diff --git a/src/srun/job.c b/src/srun/job.c index efa9824fc01..ae41837f749 100644 --- a/src/srun/job.c +++ b/src/srun/job.c @@ -133,7 +133,7 @@ _job_create_internal(allocation_info_t *info) job->outbuf[i] = cbuf_create(1024, 1048576); job->errbuf[i] = cbuf_create(1024, 1048576); - job->inbuf[i] = cbuf_create(256, 1048576); + job->inbuf[i] = cbuf_create(4096, 4096); cbuf_opt_set(job->inbuf[i], CBUF_OPT_OVERWRITE, 0); job->stdin_eof[i] = false; -- GitLab