diff --git a/src/slurmd/io.c b/src/slurmd/io.c index b9be3f73b9e0163c334ec61bf40fec4efa8b768b..659e67896dbb1045bec4b6b0eeee196d2888fbe7 100644 --- a/src/slurmd/io.c +++ b/src/slurmd/io.c @@ -496,8 +496,10 @@ _io_prepare_one(slurmd_job_t *j, task_info_t *t, srun_info_t *s) _io_add_connecting(j, t, s, CLIENT_STDIN); } - if (!list_find_first(t->srun_list, (ListFindF) find_obj, s)) + if (!list_find_first(t->srun_list, (ListFindF) find_obj, s)) { + debug("appending new client to srun_list for task %d", t->gid); list_append(t->srun_list, (void *) s); + } return SLURM_SUCCESS; } @@ -1187,7 +1189,8 @@ _write(io_obj_t *obj, List objs) return 0; if ((errno == EPIPE) || (errno == EINVAL) || (errno == EBADF)) _obj_close(obj, objs); - error("write failed: <task %d>: %m", io->id); + else + error("write failed: <task %d>: %m", io->id); return -1; } diff --git a/src/srun/io.c b/src/srun/io.c index 901df8f0e9d31d0e99745f87bc3f27c7e2dd4ac8..89dd749faf2f5b01e944abef182f48096d93ed88 100644 --- a/src/srun/io.c +++ b/src/srun/io.c @@ -35,6 +35,7 @@ #include <stdio.h> #include <string.h> #include <time.h> +#include <signal.h> #include "src/common/fd.h" #include "src/common/log.h" @@ -44,6 +45,7 @@ #include "src/common/slurm_protocol_pack.h" #include "src/common/xassert.h" #include "src/common/xmalloc.h" +#include "src/common/xsignal.h" #include "src/srun/io.h" #include "src/srun/job.h" @@ -102,6 +104,7 @@ static int _validate_header(slurm_io_stream_header_t *hdr, job_t *job); /* True if an EOF needs to be broadcast to all tasks */ static bool stdin_got_eof = false; +static bool stdin_open = true; static int _do_task_output_poll(fd_info_t *info) @@ -145,6 +148,7 @@ _set_iofds_nonblocking(job_t *job) int i; for (i = 0; i < job->niofds; i++) fd_set_nonblocking(job->iofd[i]); + fd_set_nonblocking(job->stdinfd); } static void @@ -244,7 +248,7 @@ _io_thr_poll(void *job_arg) int eofcnt = 0; nfds = job->niofds; /* already have n ioport fds + stdin */ - if ((job->stdinfd >= 0) && !stdin_got_eof) { + if ((job->stdinfd >= 0) && stdin_open) { _poll_set_rd(fds[nfds], job->stdinfd); nfds++; } @@ -325,7 +329,7 @@ _io_thr_poll(void *job_arg) } } - if ((job->stdinfd >= 0) && !stdin_got_eof) { + if ((job->stdinfd >= 0) && stdin_open) { if (fds[i].revents) _bcast_stdin(job->stdinfd, job); ++i; @@ -560,6 +564,8 @@ io_thr_create(job_t *job) return SLURM_ERROR; } + xsignal(SIGTTIN, SIG_IGN); + pthread_attr_init(&attr); if ((errno = pthread_create(&job->ioid, &attr, &io_thr, (void *) job))) return SLURM_ERROR; @@ -704,7 +710,6 @@ _do_task_input(job_t *job, int taskid) if ( stdin_got_eof && !job->stdin_eof[taskid] && (cbuf_used(buf) == 0) ) { - /* write(fd, &eot, 1); */ job->stdin_eof[taskid] = true; shutdown(job->out[taskid], SHUT_WR); return 0; @@ -723,10 +728,9 @@ _readx(int fd, char *buf, size_t maxbytes) { size_t n; - again: if ((n = read(fd, (void *) buf, maxbytes)) < 0) { if (errno == EINTR) - goto again; + return -1; if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) return -1; @@ -780,6 +784,15 @@ _write_all(job_t *job, cbuf_t cb, char *buf, size_t len, int taskid) error ("Dropped %d bytes stdin data", dropped); } +static void +_close_stdin(job_t *j) +{ + close(j->stdinfd); + j->stdinfd = IO_DONE; + stdin_got_eof = true; + stdin_open = false; +} + static void _bcast_stdin(int fd, job_t *job) { @@ -802,18 +815,20 @@ _bcast_stdin(int fd, job_t *job) if (len == 0) return; - if ((n = _readx(fd, buf, len)) <= 0) { - if (n == 0) { /* got EOF */ - close(job->stdinfd); - job->stdinfd = IO_DONE; - stdin_got_eof = true; - return; - } else { + if ((n = _readx(fd, buf, len)) < 0) { + if (errno == EIO) { + stdin_open = false; + debug2("disabling stdin"); + } else if (errno != EINTR) error("error reading stdin. %m"); - return; - } + return; } + if (n == 0) { + _close_stdin(job); + return; + } + if (job->ifname->type == IO_ONE) { i = job->ifname->taskid; _write_all(job, job->inbuf[i], buf, n, i); @@ -851,3 +866,4 @@ static int _validate_header(slurm_io_stream_header_t *hdr, job_t *job) return SLURM_SUCCESS; } + diff --git a/src/srun/reattach.c b/src/srun/reattach.c index 86490eda9736235d3bbd2a0daf30570075929e03..e25e350c796608bc729b3218d891b8d287bbff47 100644 --- a/src/srun/reattach.c +++ b/src/srun/reattach.c @@ -470,7 +470,12 @@ int reattach() log_set_argv0(new_argv0); } -/* if (opt.join) MARK: IS THIS NEEDED?? */ + /* + * mask and handle certain signals iff we are "joining" with + * the job in question. If opt.join is off, attached srun is in + * "read-only" mode and cannot forward stdin/signals. + */ + if (opt.join) sig_setup_sigmask(); if (msg_thr_create(job) < 0) { @@ -512,6 +517,6 @@ int reattach() static bool _job_all_done(job_t *job) { - return (job->state == SRUN_JOB_DETACHED); + return (job->state >= SRUN_JOB_TERMINATED); } diff --git a/src/srun/signals.c b/src/srun/signals.c index 606f9b90f993d2109a01d65335f917139a746bd7..54c9bdef11e0b05dab6870b491fdc2f32e0ca650 100644 --- a/src/srun/signals.c +++ b/src/srun/signals.c @@ -95,6 +95,7 @@ sig_setup_sigmask(void) sigaddset(&sigset, SIGQUIT); sigaddset(&sigset, SIGTSTP); sigaddset(&sigset, SIGSTOP); + sigaddset(&sigset, SIGCONT); if (sigprocmask(SIG_BLOCK, &sigset, NULL) != 0) { error("sigprocmask: %m"); @@ -204,6 +205,7 @@ _sig_thr_setup(sigset_t *set) sigaddset(set, SIGQUIT); sigaddset(set, SIGTSTP); sigaddset(set, SIGSTOP); + sigaddset(set, SIGCONT); if ((rc = pthread_sigmask(SIG_BLOCK, set, NULL)) != 0) error ("pthread_sigmask: %s", slurm_strerror(rc)); } @@ -229,9 +231,14 @@ _sig_thr(void *arg) _handle_intr(job, &last_intr, &last_intr_sent); break; case SIGSTOP: - case SIGTSTP: debug3("Ignoring SIGSTOP"); break; + case SIGTSTP: + debug3("got SIGTSTP"); + break; + case SIGCONT: + debug3("got SIGCONT"); + break; case SIGQUIT: info("Quit"); job_force_termination(job);