diff --git a/src/slurmd/io.c b/src/slurmd/io.c index b3e33aa933490cb2ba01983388a776c9ca4fe0c9..e6b84f1f61d4ac82c847794003580ca926c15f5d 100644 --- a/src/slurmd/io.c +++ b/src/slurmd/io.c @@ -42,6 +42,7 @@ #endif #include <sys/types.h> +#include <sys/socket.h> #include <sys/stat.h> #include <fcntl.h> #include <unistd.h> @@ -66,7 +67,7 @@ typedef enum slurmd_io_tupe { TASK_STDIN, CLIENT_STDERR, CLIENT_STDOUT, - CLIENT_STDIN + CLIENT_STDIN, } slurmd_io_type_t; static char *_io_str[] = @@ -76,7 +77,7 @@ static char *_io_str[] = "task stdin", "client stderr", "client stdout", - "client stdin" + "client stdin", }; /* The IO information structure @@ -101,6 +102,10 @@ struct io_info { * (e.g. A "ghost" client attached * to a task.) */ + + unsigned rw:1; /* 1 if client is read-write + * capable, 0 otherwize + */ }; @@ -260,8 +265,6 @@ static void _io_finalize(task_info_t *t) { struct io_info *in = t->in->arg; - ListIterator i; - struct io_info *io; if (_xclose(t->pin[0] ) < 0) error("close(stdin) : %m"); @@ -272,16 +275,27 @@ _io_finalize(task_info_t *t) in->disconnected = 1; - if (!in->writers) - return; - - i = list_iterator_create(in->writers); - while ((io = list_next(i))) { - if (io->obj->fd > 0) { - io->eof = 1; + /* Need to close all stdin writers + * + * We effectively close these writers by + * forcing them to be unwritable. This will + * prevent the IO thread from hanging waiting + * for stdin data. (While also not forcing the + * close of a pipe that is also writable) + */ + + if (in->writers) { + ListIterator i; + struct io_info *io; + + i = list_iterator_create(in->writers); + while ((io = list_next(i))) { + if (io->obj->fd > 0) { + io->obj->ops->readable = NULL; + } } + list_iterator_destroy(i); } - list_iterator_destroy(i); } void @@ -316,7 +330,9 @@ _handle_unprocessed_output(slurmd_job_t *job) continue; if (io->buf && (n = cbuf_used(io->buf))) - job_error(job, "%ld bytes of stdout unprocessed", n); + job_error(job, + "task %d: %ld bytes of stdout unprocessed", + io->id, n); if (!(readers = ((struct io_info *)t->err->arg)->readers)) continue; @@ -412,6 +428,15 @@ _io_add_connecting(slurmd_job_t *job, task_info_t *t, srun_info_t *srun, obj = _io_obj(job, t, sock, type); obj->ops = &connecting_client_ops; _io_write_header(obj->arg, srun); + + if ((type == CLIENT_STDOUT) && !srun->ifname) { + struct io_info *io = obj->arg; + /* This is the only read-write capable client + * at this time: a connected CLIENT_STDOUT + */ + io->rw = 1; + } + list_append(job->objs, (void *)obj); } @@ -538,8 +563,8 @@ _open_stdin_file(slurmd_job_t *job, task_info_t *t, srun_info_t *srun) char *fname = fname_create(job, srun->ifname, t->gid); if ((fd = _open_task_file(fname, flags)) > 0) { + debug("opened `%s' for %s fd %d", fname, "stdin", fd); obj = _io_obj(job, t, fd, CLIENT_STDIN); - _obj_set_unwritable(obj); _io_client_attach(obj, NULL, t->in, job->objs); } xfree(fname); @@ -570,10 +595,16 @@ _io_client_attach(io_obj_t *client, io_obj_t *writer, * reader is still available. * */ - if (reader->fd < 0 || dst->disconnected) + if (reader->fd < 0 || dst->disconnected) { + debug3("can't attach %s to closed %s", + _io_str[cli->type], _io_str[dst->type]); _obj_close(client, objList); - else - _io_connect_objs(client, reader); + return; + } + + _io_connect_objs(client, reader); + if (!list_find_first(objList, (ListFindF) find_obj, client)) + list_append(objList, client); return; } @@ -654,8 +685,15 @@ _io_connect_objs(io_obj_t *obj1, io_obj_t *obj2) if (!list_find_first(src->readers, (ListFindF)find_obj, dst)) list_append(src->readers, dst); + else + debug3("%s already in %s readers list!", + _io_str[dst->type], _io_str[src->type]); + if (!list_find_first(dst->writers, (ListFindF)find_obj, src)) list_append(dst->writers, src); + else + debug3("%s already in %s writers list!", + _io_str[src->type], _io_str[dst->type]); } /* @@ -802,8 +840,12 @@ _io_obj(slurmd_job_t *job, task_info_t *t, int fd, int type) break; case TASK_STDIN: obj->ops = &task_in_ops; - io->buf = cbuf_create(512, 10240); + io->buf = cbuf_create(512, 4096); io->writers = list_create(NULL); + + /* Never overwrite stdin data + */ + cbuf_opt_set(io->buf, CBUF_OPT_OVERWRITE, 0); break; case CLIENT_STDOUT: io->readers = list_create(NULL); @@ -818,6 +860,7 @@ _io_obj(slurmd_job_t *job, task_info_t *t, int fd, int type) io->readers = list_create(NULL); /* * Connected stdin still needs output buffer + * (for connection header) */ io->buf = cbuf_create(256, 1024); break; @@ -903,6 +946,7 @@ _io_info_create(uint32_t id) io->writers = NULL; io->eof = 0; io->disconnected = 0; + io->rw = 0; xassert(io->magic = IO_MAGIC); return io; } @@ -1082,7 +1126,7 @@ _write(io_obj_t *obj, List objs) while ((n = cbuf_read_to_fd(io->buf, obj->fd, -1)) < 0) { if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) - break; + return 0; if ((errno == EPIPE) || (errno == EINVAL) || (errno == EBADF)) _obj_close(obj, objs); error("write failed: <task %d>: %m", io->id); @@ -1109,7 +1153,17 @@ _do_attach(struct io_info *io) switch (io->type) { case CLIENT_STDOUT: - _io_client_attach(io->obj, t->out, t->in, io->job->objs); + if (io->rw) { + debug3("attaching task %d client stdout read-write", + io->id); + _io_client_attach( io->obj, t->out, t->in, + io->job->objs ); + } else { + debug3("attaching task %d client stdout write-only", + io->id); + _io_client_attach( io->obj, t->out, NULL, + io->job->objs ); + } break; case CLIENT_STDERR: _io_client_attach(io->obj, t->err, NULL, io->job->objs); @@ -1163,7 +1217,6 @@ static int _shutdown_task_obj(struct io_info *t) { ListIterator i; - List rlist; struct io_info *r; xassert(_isa_task(t)); @@ -1175,12 +1228,16 @@ _shutdown_task_obj(struct io_info *t) t->disconnected = 1; - rlist = t->writers ? : t->readers; + if (!t->readers) + return 0; /* Task objects do not get destroyed. * Simply propagate the EOF to the clients + * + * Only propagate EOF to readers + * */ - i = list_iterator_create(rlist); + i = list_iterator_create(t->readers); while ((r = list_next(i))) r->eof = 1; list_iterator_destroy(i); @@ -1258,13 +1315,24 @@ _client_read(io_obj_t *obj, List objs) { struct io_info *client = (struct io_info *) obj->arg; struct io_info *reader; - char buf[1024]; /* XXX Configurable? */ - ssize_t n, len = sizeof(buf); - ListIterator i; + char buf[4096]; + int dropped = 0; + ssize_t n = 0; + ssize_t len = sizeof(buf); + ListIterator i = NULL; xassert(client->magic == IO_MAGIC); xassert(_validate_io_list(objs)); xassert(_isa_client(client)); + + i = list_iterator_create(client->readers); + while ((reader = list_next(i))) { + if (cbuf_free(reader->buf) < len) + len = cbuf_free(reader->buf); + } + + if (len == 0) + return 0; again: @@ -1279,8 +1347,8 @@ _client_read(io_obj_t *obj, List objs) client->id); if (n == 0) { /* got eof, pass this eof to readers */ - debug3("%s %d stdin closed connection", _io_str[client->type], - client->id); + debug3("task %d [%s fd %d] read closed", + client->id, _io_str[client->type], obj->fd); /* * Do not read from this stdin any longer */ @@ -1295,14 +1363,19 @@ _client_read(io_obj_t *obj, List objs) i = list_iterator_create(client->readers); while((reader = list_next(i))) { if (list_count(reader->writers) == 1) - reader->eof = 1; - list_delete_all( reader->writers, - (ListFindF) find_obj, - client ); + reader->eof = 1; + else + debug3("can't send EOF to stdin"); + } list_iterator_destroy(i); } + /* It is unsafe to close CLIENT_STDOUT + */ + if (client->type == CLIENT_STDIN) + _obj_close(obj, client->job->objs); + return 0; } @@ -1315,9 +1388,12 @@ _client_read(io_obj_t *obj, List objs) /* * Copy cbuf to all readers */ - i = list_iterator_create(client->readers); - while((reader = list_next(i))) - n = cbuf_write(reader->buf, (void *) buf, n, NULL); + list_iterator_reset(i); + while((reader = list_next(i))) { + n = cbuf_write(reader->buf, (void *) buf, n, &dropped); + error ("Dropped %d bytes stdin data to task %d", + dropped, client->id); + } list_iterator_destroy(i); return 0; diff --git a/src/slurmd/mgr.c b/src/slurmd/mgr.c index fd07ab6c2d2ba35a6104598ed3ba9a326b1e6e52..34cac33a2a9a0dc80a0724799fb9b3318a7b99b1 100644 --- a/src/slurmd/mgr.c +++ b/src/slurmd/mgr.c @@ -699,6 +699,8 @@ _send_exit_msg(int rc, task_info_t *t) ListIterator i; srun_info_t *srun; + debug3("sending task exit msg for %d", t->gid); + /* FIXME:XXX: attempt to combine task IDs in single message */ task_id_list[0] = t->gid; msg.task_id_list = task_id_list; diff --git a/src/srun/io.c b/src/srun/io.c index bc469f0b2ffcc36d2f334529eecf71d371bae29a..f20b6a888d9c3df4f4ad603f15ae40baebd600fd 100644 --- a/src/srun/io.c +++ b/src/srun/io.c @@ -120,16 +120,20 @@ _handle_pollerr(fd_info_t *info) socklen_t size = sizeof(int); if (getsockopt(fd, SOL_SOCKET, SO_ERROR, (void *)&err, &size) < 0) error("_handle_error_poll: getsockopt: %m"); - else if (err) { - if ((err != EPIPE) && (err != ECONNRESET)) - error("poll error on fd %d: %s", fd, slurm_strerror(err)); - _close_stream(info->fd, info->fp, info->taskid); - } - if (*info->fd > 0) - return _do_task_output_poll(info); - else - return 0; + if (err) + debug3("%d: poll error on fd %d: %s", + info->taskid, fd, slurm_strerror(err)); + else + debug3("%d: fd %d got hangup", info->taskid, fd); + + /* _do_task_output() should read EOF and close output + * stream if necessary. This way, any remaining data + * is read. + */ + _do_task_output(info->fd, info->fp, info->buf, info->taskid); + + return 0; } static void @@ -161,15 +165,26 @@ _do_output(cbuf_t buf, FILE *out, int tasknum) fputs(line, out); fflush(out); } + + if ((len = cbuf_used(buf))) + error ("Unable to print %d bytes output data", len); } static void _flush_io(job_t *job) { int i; + + debug3("flushing all io"); for (i = 0; i < opt.nprocs; i++) { + _do_output(job->outbuf[i], job->outstream, i); + if (job->out[i] != IO_DONE) + _close_stream(&job->out[i], stdout, i); + _do_output(job->errbuf[i], job->errstream, i); + if (job->err[i] != IO_DONE) + _close_stream(&job->err[i], stderr, i); } } @@ -232,7 +247,7 @@ _io_thr_poll(void *job_arg) if ( (cbuf_used(job->inbuf[i]) > 0) || (stdin_got_eof && !job->stdin_eof[i])) - _poll_set_wr(fds[nfds], job->out[i]); + fds[nfds].events |= POLLOUT; map[nfds].taskid = i; map[nfds].fd = &job->out[i]; @@ -266,8 +281,11 @@ _io_thr_poll(void *job_arg) time_first_done = time(NULL); } - if (job->state == SRUN_JOB_FAILED) + if (job->state == SRUN_JOB_FAILED) { + debug3("job state is failed"); + _flush_io(job); pthread_exit(0); + } while ((rc = poll(fds, nfds, POLL_TIMEOUT_MSEC)) <= 0) { if (rc == 0) { /* timeout */ @@ -304,9 +322,7 @@ _io_thr_poll(void *job_arg) for ( ; i < nfds; i++) { unsigned short revents = fds[i].revents; xassert(!(revents & POLLNVAL)); - if ( (revents & POLLERR) - || (revents & POLLHUP) - || (revents & POLLNVAL)) + if ((revents & POLLERR) || (revents & POLLHUP)) _handle_pollerr(&map[i]); if ((revents & POLLIN) && (*map[i].fd > 0)) @@ -331,9 +347,10 @@ static void _do_poll_timeout (job_t *job) } age = time(NULL) - time_first_done; - if (job->state == SRUN_JOB_FAILED) + if (job->state == SRUN_JOB_FAILED) { + debug3("job failed, exiting IO thread"); pthread_exit(0); - else if (time_first_done && opt.max_wait && + } else if (time_first_done && opt.max_wait && (age >= opt.max_wait) ) { error("First task terminated %d seconds ago", age); @@ -523,7 +540,7 @@ _accept_io_stream(job_t *job, int i) int len = size_io_stream_header(); int j; int fd = job->iofd[i]; - debug("Activity on IO server port %d fd %d", i, fd); + debug2("Activity on IO server port %d fd %d", i, fd); for (j = 0; j < 15; i++) { int sd, size_read; @@ -589,7 +606,7 @@ _accept_io_stream(job_t *job, int i) else job->err[hdr.task_id] = sd; - debug("accepted %s connection from %s task %ld, sd=%d", + debug2("accepted %s connection from %s task %ld, sd=%d", (hdr.type ? "stderr" : "stdout"), buf, hdr.task_id, sd ); } @@ -600,7 +617,7 @@ static int _close_stream(int *fd, FILE *out, int tasknum) { int retval; - debug("%d: <%s disconnected>", tasknum, + debug2("%d: <%s disconnected>", tasknum, out == stdout ? "stdout" : "stderr"); fflush(out); retval = shutdown(*fd, SHUT_RDWR); @@ -651,6 +668,8 @@ _do_task_input(job_t *job, int taskid) if ((len = cbuf_read_to_fd(buf, fd, -1)) < 0) error ("writing stdin data: %m"); + debug3("wrote %d bytes to task %d stdin", len, taskid); + return len; } @@ -702,25 +721,44 @@ static void _write_all(job_t *job, cbuf_t cb, char *buf, size_t len, int taskid) { int n = 0; + int dropped = 0; again: - n = cbuf_write(cb, buf, len, NULL); + n = cbuf_write(cb, buf, len, &dropped); if ((n < len) && (job->out[taskid] > 0)) { error("cbuf_write returned %d", n); _do_task_input(job, taskid); goto again; } + + if (dropped) + error ("Dropped %d bytes stdin data", dropped); } static void _bcast_stdin(int fd, job_t *job) { - int i; - size_t len; - char buf[4096]; + int i; + char buf[4096]; + ssize_t len = sizeof(buf); + ssize_t n = 0; + + if (job->ifname->type == IO_ONE) { + i = job->ifname->taskid; + if (cbuf_free(job->inbuf[i]) < len) + len = cbuf_free(job->inbuf[i]); + } else { + for (i = 0; i < opt.nprocs; i++) { + if (cbuf_free(job->inbuf[i]) < len) + len = cbuf_free(job->inbuf[i]); + } + } + + if (len == 0) + return; - if ((len = _readx(fd, buf, 4096)) <= 0) { - if (len == 0) { /* got EOF */ + if ((n = _readx(fd, buf, len)) <= 0) { + if (n == 0) { /* got EOF */ close(job->stdinfd); job->stdinfd = IO_DONE; stdin_got_eof = true; @@ -733,10 +771,10 @@ _bcast_stdin(int fd, job_t *job) if (job->ifname->type == IO_ONE) { i = job->ifname->taskid; - _write_all(job, job->inbuf[i], buf, len, i); + _write_all(job, job->inbuf[i], buf, n, i); } else { for (i = 0; i < opt.nprocs; i++) - _write_all(job, job->inbuf[i], buf, len, i); + _write_all(job, job->inbuf[i], buf, n, i); } return; diff --git a/src/srun/job.c b/src/srun/job.c index 623a390c075aa0688765ec8acf1150f12df75954..efa9824fc0170dfbc62c1f1f8254701881b470d0 100644 --- a/src/srun/job.c +++ b/src/srun/job.c @@ -27,6 +27,10 @@ #include <netdb.h> #include <string.h> #include <stdlib.h> +#include <unistd.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> #include "src/common/hostlist.h" #include "src/common/log.h" @@ -40,12 +44,12 @@ #include "src/srun/fname.h" typedef struct allocation_info { - uint32_t jobid; - uint32_t stepid; - char *nodelist; - slurm_addr *addrs; - int *cpus_per_node; - int *cpu_count_reps; + uint32_t jobid; + uint32_t stepid; + char *nodelist; + slurm_addr *addrs; + int *cpus_per_node; + int *cpu_count_reps; } allocation_info_t; @@ -82,7 +86,9 @@ _job_create_internal(allocation_info_t *info) job->stepid = info->stepid; job->slurmd_addr = xmalloc(job->nhosts * sizeof(slurm_addr)); - memcpy(job->slurmd_addr, info->addrs, sizeof(slurm_addr)*job->nhosts); + if (info->addrs) + memcpy( job->slurmd_addr, info->addrs, + sizeof(slurm_addr)*job->nhosts); job->host = (char **) xmalloc(job->nhosts * sizeof(char *)); job->cpus = (int *) xmalloc(job->nhosts * sizeof(int) ); @@ -182,6 +188,20 @@ job_create_allocation(resource_allocation_response_msg_t *resp) return (job); } +static void +_job_fake_cred(job_t *job) +{ + int fd; + if ((fd = open("/dev/random", O_RDONLY)) < 0) + error ("unable to open /dev/random: %m"); + + job->cred = xmalloc(sizeof(*job->cred)); + job->cred->job_id = job->jobid; + job->cred->user_id = opt.uid; + job->cred->expiration_time = 0x7fffffff; + read(fd, job->cred->signature, SLURM_SSL_SIGNATURE_LENGTH); +} + job_t * job_create_noalloc(void) { @@ -207,19 +227,21 @@ job_create_noalloc(void) info->cpus_per_node = &cpn; info->cpu_count_reps = &opt.nprocs; + info->addrs = NULL; /* * Create job, then fill in host addresses */ job = _job_create_internal(info); - job->slurmd_addr = xmalloc(job->nhosts * sizeof(slurm_addr)); for (i = 0; i < job->nhosts; i++) { slurm_set_addr ( &job->slurmd_addr[i], slurm_get_slurmd_port(), job->host[i] ); } + _job_fake_cred(job); + error: xfree(info); return (job); diff --git a/src/srun/msg.c b/src/srun/msg.c index 58482b15ca2833e005c070e7d049ca1705587990..3c03b1cfef5692cd549569c05d9ddb55528a1274 100644 --- a/src/srun/msg.c +++ b/src/srun/msg.c @@ -203,7 +203,7 @@ _exit_handler(job_t *job, slurm_msg_t *exit_msg) taskid, _taskid2hostname(taskid, job), msg->return_code); else - debug2("task %d exited with status 0", taskid); + verbose("task %d exited with status 0", taskid); slurm_mutex_lock(&job->task_mutex); job->tstatus[taskid] = msg->return_code; @@ -272,7 +272,7 @@ _handle_msg(job_t *job, slurm_msg_t *msg) slurm_free_task_exit_msg(msg->data); break; case RESPONSE_REATTACH_TASKS: - debug("recvd reattach response\n"); + debug2("recvd reattach response\n"); _reattach_handler(job, msg); slurm_free_reattach_tasks_response_msg(msg->data); break;