diff --git a/src/srun/io.c b/src/srun/io.c index 5e64acdc243986538b61eaf329a8c3f5ea82656c..d58937fd251157f3878c4e6b4232ae9fdabaa772 100644 --- a/src/srun/io.c +++ b/src/srun/io.c @@ -217,6 +217,7 @@ _io_thr_poll(void *job_arg) _do_task_output_poll(&map[i]); } } + xfree(fds); } static void @@ -233,8 +234,8 @@ static void _set_iofds_nonblocking(job); for (i = 0; i < opt.nprocs; i++) { - job->out[i] = -1; - job->err[i] = -1; + job->out[i] = WAITING_FOR_IO; + job->err[i] = WAITING_FOR_IO; } while (1) { @@ -463,7 +464,7 @@ _readn(int fd, void *buf, size_t nbytes) static void _bcast_stdin(int fd, job_t *job) { - int i; + int i, disc=0; char buf[IO_BUFSIZ]; size_t len; @@ -476,8 +477,17 @@ _bcast_stdin(int fd, job_t *job) } } - for (i = 0; i < opt.nprocs; i++) - write(job->out[i], buf, len); + /* broadcast to every connected task */ + for (i = 0; i < opt.nprocs; i++) { + if ((job->out[i] == WAITING_FOR_IO) || + (job->out[i] == IO_DONE)) + disc++; + else + write(job->out[i], buf, len); + } + if (disc) + error("Stdin could not be sent to %d disconnected tasks", + disc); return; } diff --git a/src/srun/job.c b/src/srun/job.c index f1edb8fc63a7f3ae4e3411651826f4cd9cd5a592..266daa6de61df3705157e90c02ca7299660d122a 100644 --- a/src/srun/job.c +++ b/src/srun/job.c @@ -65,7 +65,6 @@ job_create(resource_allocation_response_msg_t *resp) job->cred->signature[0] = 'a'; job->nodelist = xstrdup(opt.nodelist); -debug("nodelist=%s",job->nodelist); hl = hostlist_create(opt.nodelist); srand48(getpid()); job->jobid = (uint32_t) (lrand48() % 65550L + 1L); @@ -76,7 +75,6 @@ debug("nodelist=%s",job->nodelist); job->nhosts = hostlist_count(hl); -debug("nhosts=%d",job->nhosts); job->host = (char **) xmalloc(job->nhosts * sizeof(char *)); job->slurmd_addr = (slurm_addr *) xmalloc(job->nhosts * sizeof(slurm_addr)); diff --git a/src/srun/msg.c b/src/srun/msg.c index 65df5205097effa72a095242cf6ca57f8b555693..18d1f4e62b82d6b3742a3d76cf132db24b1d1902 100644 --- a/src/srun/msg.c +++ b/src/srun/msg.c @@ -130,7 +130,7 @@ _accept_msg_connection(job_t *job, int fdnum) char addrbuf[256]; if ((fd = slurm_accept_msg_conn(job->jfd[fdnum], &cli_addr)) < 0) { - error("slurm_accept_msg_conn: %m"); + error("_accept_msg_connection/slurm_accept_msg_conn: %m"); return; } @@ -139,7 +139,7 @@ _accept_msg_connection(job_t *job, int fdnum) msg = xmalloc(sizeof(*msg)); if (slurm_receive_msg(fd, msg) == SLURM_SOCKET_ERROR) { - error("slurm_receive_msg: %m"); + error("_accept_msg_connection/slurm_receive_msg: %m"); return; } @@ -232,7 +232,7 @@ _msg_thr_one(void *arg) while (1) { if ((newfd = slurm_accept_msg_conn(fd, &cli_addr)) < 0) { - error("slurm_accept_msg_conn: rc=%d", errno); + error("_msg_thr_one/slurm_accept_msg_conn: %m"); break; } @@ -242,7 +242,7 @@ _msg_thr_one(void *arg) msg = xmalloc(sizeof(*msg)); if (slurm_receive_msg(newfd, msg) == SLURM_SOCKET_ERROR) { - error("slurm_receive_msg: rc=%d\n", errno); + error("_msg_thr_one/slurm_receive_msg: %m"); break; }