diff --git a/NEWS b/NEWS index 06be6c15eba1ba2cd815f16097059c60b807487f..2ad39977e042693e5643862d83cac947a193ff1c 100644 --- a/NEWS +++ b/NEWS @@ -6,8 +6,7 @@ documents those changes that are of interest to users and admins. -- New stdio protocol. Now srun has just a single TCP stream to each node of a job-step. srun and slurmd comminicate over the TCP stream using a simple messaging protocol. - WARNING: The stdio rewrite is incomplete (in particular --attach support - is still missing), and not yet debugged. + WARNING: The stdio rewrite is incomplete and not yet fully debugged. -- Added task plugin and use task prolog/epilog(s). -- New slurmd_step functionality added. Fork exec instead of using shared memory. Not completely tested. diff --git a/src/slurmd/io.c b/src/slurmd/io.c index 61a33686feed9af5a216101039bd58fa50702164..413af0551b73a95b8e1ec34cc7af78919f3d84b7 100644 --- a/src/slurmd/io.c +++ b/src/slurmd/io.c @@ -63,19 +63,6 @@ #include "src/slurmd/fname.h" #include "src/slurmd/slurmd.h" -struct incoming_client_info { - struct slurm_io_header header; - struct io_buf *msg; - int32_t remaining; - bool eof; -}; - -struct outgoing_fd_info { - List msg_queue; - struct io_buf *msg; - int32_t remaining; -}; - /********************************************************************** * IO client socket declarations **********************************************************************/ @@ -98,8 +85,17 @@ struct client_io_info { #endif slurmd_job_t *job; /* pointer back to job data */ - struct incoming_client_info in; - struct outgoing_fd_info out; + /* incoming variables */ + struct slurm_io_header header; + struct io_buf *in_msg; + int32_t in_remaining; + bool in_eof; + + /* outgoing variables */ + List msg_queue; + struct io_buf *out_msg; + int32_t out_remaining; + bool out_eof; }; /********************************************************************** @@ -108,19 +104,21 @@ struct client_io_info { static bool _task_writable(eio_obj_t *); static int _task_write(eio_obj_t *, List); -struct io_operations task_in_ops = { +struct io_operations task_write_ops = { writable: &_task_writable, handle_write: &_task_write, }; -struct task_in_info { +struct task_write_info { #ifndef NDEBUG #define TASK_IN_MAGIC 0x10103 int magic; #endif slurmd_job_t *job; /* pointer back to job data */ - struct outgoing_fd_info out; + List msg_queue; + struct io_buf *msg; + int32_t remaining; }; /********************************************************************** @@ -129,12 +127,12 @@ struct task_in_info { static bool _task_readable(eio_obj_t *); static int _task_read(eio_obj_t *, List); -struct io_operations task_out_ops = { +struct io_operations task_read_ops = { readable: &_task_readable, handle_read: &_task_read, }; -struct task_out_info { +struct task_read_info { #ifndef NDEBUG #define TASK_OUT_MAGIC 0x10103 int magic; @@ -153,14 +151,15 @@ struct task_out_info { **********************************************************************/ static void *_io_thr(void *); static int _send_io_init_msg(int sock, srun_key_t *key, slurmd_job_t *job); -static void _send_eof_msg(struct task_out_info *out); -static struct io_buf *_task_build_message(struct task_out_info *out, +static void _send_eof_msg(struct task_read_info *out); +static struct io_buf *_task_build_message(struct task_read_info *out, slurmd_job_t *job, cbuf_t cbuf); static struct io_obj *_io_obj(slurmd_job_t *, slurmd_task_info_t *, int, int); static void *_io_thr(void *arg); static void _route_msg_task_to_client(eio_obj_t *obj); static void _free_outgoing_msg(struct io_buf *msg, slurmd_job_t *job); static void _free_incoming_msg(struct io_buf *msg, slurmd_job_t *job); +static void _free_all_outgoing_msgs(List msg_queue, slurmd_job_t *job); /********************************************************************** * IO client socket functions @@ -173,7 +172,7 @@ _client_readable(eio_obj_t *obj) debug3("Called _client_readable"); xassert(client->magic == CLIENT_IO_MAGIC); - if (client->in.eof) { + if (client->in_eof) { debug3(" false"); return false; } @@ -181,10 +180,10 @@ _client_readable(eio_obj_t *obj) if (obj->shutdown) { debug3(" false, shutdown"); shutdown(obj->fd, SHUT_RD); - client->in.eof = true; + client->in_eof = true; } - if (client->in.msg != NULL + if (client->in_msg != NULL || !list_is_empty(client->job->free_incoming)) return true; @@ -200,15 +199,34 @@ _client_writable(eio_obj_t *obj) debug3("Called _client_writable"); xassert(client->magic == CLIENT_IO_MAGIC); - if (client->out.msg != NULL) - debug3(" client->out.msg != NULL"); + if (client->out_eof == true) { + debug3(" false, out_eof"); + return false; + } - if (!list_is_empty(client->out.msg_queue)) + /* If this is a newly attached client its msg_queue needs + * to be intialized from the outgoing_cache + */ + if (client->msg_queue == NULL) { + ListIterator msgs; + struct io_buf *msg; + client->msg_queue = list_create(NULL); /* need destructor */ + msgs = list_iterator_create(client->job->outgoing_cache); + while (msg = list_next(msgs)) { + msg->ref_count++; + list_enqueue(client->msg_queue, msg); + } + list_iterator_destroy(msgs); + } + + if (client->out_msg != NULL) + debug3(" client->out.msg != NULL"); + if (!list_is_empty(client->msg_queue)) debug3(" client->out.msg_queue queue length = %d", - list_count(client->out.msg_queue)); + list_count(client->msg_queue)); - if (client->out.msg != NULL - || !list_is_empty(client->out.msg_queue)) + if (client->out_msg != NULL + || !list_is_empty(client->msg_queue)) return true; debug3(" false"); @@ -219,48 +237,46 @@ static int _client_read(eio_obj_t *obj, List objs) { struct client_io_info *client = (struct client_io_info *) obj->arg; - struct incoming_client_info *in; void *buf; int n; debug2("Entering _client_read"); xassert(client->magic == CLIENT_IO_MAGIC); - in = &client->in; /* * Read the header, if a message read is not already in progress */ - if (in->msg == NULL) { - in->msg = list_dequeue(client->job->free_incoming); - if (in->msg == NULL) { + if (client->in_msg == NULL) { + client->in_msg = list_dequeue(client->job->free_incoming); + if (client->in_msg == NULL) { debug3(" _client_read free_incoming is empty"); return SLURM_SUCCESS; } - n = io_hdr_read_fd(obj->fd, &in->header); + n = io_hdr_read_fd(obj->fd, &client->header); if (n <= 0) { /* got eof or fatal error */ debug3(" got eof or error _client_read header, n=%d", n); - in->eof = true; - list_enqueue(client->job->free_incoming, in->msg); - in->msg = NULL; + client->in_eof = true; + list_enqueue(client->job->free_incoming, client->in_msg); + client->in_msg = NULL; return SLURM_SUCCESS; } - debug3("in->header.length = %d", in->header.length); - if (in->header.length > MAX_MSG_LEN) + debug3("client->header.length = %d", client->header.length); + if (client->header.length > MAX_MSG_LEN) fatal("Message length of %d exceeds maximum of %d", - in->header.length, MAX_MSG_LEN); - in->remaining = in->header.length; - in->msg->length = in->header.length; + client->header.length, MAX_MSG_LEN); + client->in_remaining = client->header.length; + client->in_msg->length = client->header.length; } /* * Read the body */ - if (in->header.length == 0) { /* zero length is an eof message */ + if (client->header.length == 0) { /* zero length is an eof message */ debug3(" got stdin eof message!"); } else { - buf = in->msg->data + (in->msg->length - in->remaining); + buf = client->in_msg->data + (client->in_msg->length - client->in_remaining); again: - if ((n = read(obj->fd, buf, in->remaining)) < 0) { + if ((n = read(obj->fd, buf, client->in_remaining)) < 0) { if (errno == EINTR) goto again; /* FIXME handle error */ @@ -268,13 +284,13 @@ _client_read(eio_obj_t *obj, List objs) } if (n == 0) { /* got eof */ debug3(" got eof on _client_read body"); - in->eof = true; - list_enqueue(client->job->free_incoming, in->msg); - in->msg = NULL; + client->in_eof = true; + list_enqueue(client->job->free_incoming, client->in_msg); + client->in_msg = NULL; return SLURM_SUCCESS; } - in->remaining -= n; - if (in->remaining > 0) + client->in_remaining -= n; + if (client->in_remaining > 0) return SLURM_SUCCESS; /* *(char *)(buf + n) = '\0'; */ /* debug3("\"%s\"", buf); */ @@ -283,40 +299,40 @@ _client_read(eio_obj_t *obj, List objs) /* * Route the message to its destination(s) */ - if (in->header.type != SLURM_IO_STDIN - && in->header.type != SLURM_IO_ALLSTDIN) { - error("Input in->header.type is not valid!"); - in->msg = NULL; + if (client->header.type != SLURM_IO_STDIN + && client->header.type != SLURM_IO_ALLSTDIN) { + error("Input client->header.type is not valid!"); + client->in_msg = NULL; return SLURM_ERROR; } else { int i; slurmd_task_info_t *task; - struct task_in_info *io; + struct task_write_info *io; - in->msg->ref_count = 0; - if (in->header.type == SLURM_IO_ALLSTDIN) { + client->in_msg->ref_count = 0; + if (client->header.type == SLURM_IO_ALLSTDIN) { for (i = 0; i < client->job->ntasks; i++) { task = client->job->task[i]; - io = (struct task_in_info *)(task->in->arg); - in->msg->ref_count++; - list_enqueue(io->out.msg_queue, in->msg); + io = (struct task_write_info *)task->in->arg; + client->in_msg->ref_count++; + list_enqueue(io->msg_queue, client->in_msg); } - debug3(" message ref_count = %d", in->msg->ref_count); + debug3(" message ref_count = %d", client->in_msg->ref_count); } else { for (i = 0; i < client->job->ntasks; i++) { task = client->job->task[i]; if (task->in == NULL) continue; - io = (struct task_in_info *)task->in->arg; - if (task->gtid != in->header.gtaskid) + io = (struct task_write_info *)task->in->arg; + if (task->gtid != client->header.gtaskid) continue; - in->msg->ref_count++; - list_enqueue(io->out.msg_queue, in->msg); + client->in_msg->ref_count++; + list_enqueue(io->msg_queue, client->in_msg); break; } } } - in->msg = NULL; + client->in_msg = NULL; debug2("Leaving _client_read"); return SLURM_SUCCESS; } @@ -328,49 +344,58 @@ static int _client_write(eio_obj_t *obj, List objs) { struct client_io_info *client = (struct client_io_info *) obj->arg; - struct outgoing_fd_info *out; void *buf; int n; xassert(client->magic == CLIENT_IO_MAGIC); debug2("Entering _client_write"); - out = &client->out; /* * If we aren't already in the middle of sending a message, get the * next message from the queue. */ - if (out->msg == NULL) { - out->msg = list_dequeue(out->msg_queue); - if (out->msg == NULL) { + if (client->out_msg == NULL) { + client->out_msg = list_dequeue(client->msg_queue); + if (client->out_msg == NULL) { debug3("_client_write: nothing in the queue"); return SLURM_SUCCESS; } - debug3(" dequeue successful, out->msg->length = %d", out->msg->length); - out->remaining = out->msg->length; + debug3(" dequeue successful, client->out_msg->length = %d", client->out_msg->length); + client->out_remaining = client->out_msg->length; } - debug3(" out->remaining = %d", out->remaining); + debug3(" client->out_remaining = %d", client->out_remaining); /* * Write message to socket. */ - buf = out->msg->data + (out->msg->length - out->remaining); + buf = client->out_msg->data + (client->out_msg->length - client->out_remaining); + debug3("made it here"); again: - if ((n = write(obj->fd, buf, out->remaining)) < 0) { + if ((n = write(obj->fd, buf, client->out_remaining)) < 0) { + debug3("made it here too"); if (errno == EINTR) goto again; - /* FIXME handle error */ - return SLURM_ERROR; + if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) { + debug3("_client_write returned EAGAIN"); + return SLURM_SUCCESS; + } + if (errno == EPIPE) { + client->out_eof = true; + _free_all_outgoing_msgs(client->msg_queue, client->job); + return SLURM_SUCCESS; + } + error("Get error on write() in _client_write: %m"); + return SLURM_SUCCESS; } debug3("Wrote %d bytes to socket", n); - out->remaining -= n; - if (out->remaining > 0) + client->out_remaining -= n; + if (client->out_remaining > 0) return SLURM_SUCCESS; - _free_outgoing_msg(out->msg, client->job); - out->msg = NULL; + _free_outgoing_msg(client->out_msg, client->job); + client->out_msg = NULL; return SLURM_SUCCESS; } @@ -384,19 +409,19 @@ again: static eio_obj_t * _create_task_in_eio(int fd, slurmd_job_t *job) { - struct task_in_info *in = NULL; + struct task_write_info *t = NULL; eio_obj_t *eio = NULL; - in = (struct task_in_info *)xmalloc(sizeof(struct task_in_info)); + t = (struct task_write_info *)xmalloc(sizeof(struct task_write_info)); #ifndef NDEBUG - in->magic = TASK_IN_MAGIC; + t->magic = TASK_IN_MAGIC; #endif - in->job = job; - in->out.msg_queue = list_create(NULL); /* FIXME! Add destructor */ - in->out.msg = NULL; - in->out.remaining = 0; + t->job = job; + t->msg_queue = list_create(NULL); /* FIXME! Add destructor */ + t->msg = NULL; + t->remaining = 0; - eio = eio_obj_create(fd, &task_in_ops, (void *)in); + eio = eio_obj_create(fd, &task_write_ops, (void *)t); return eio; } @@ -404,70 +429,66 @@ _create_task_in_eio(int fd, slurmd_job_t *job) static bool _task_writable(eio_obj_t *obj) { - struct task_in_info *in = (struct task_in_info *) obj->arg; - struct outgoing_fd_info *out = &in->out; + struct task_write_info *t = (struct task_write_info *) obj->arg; debug3("Called _task_writable"); - if (out->msg != NULL || list_count(out->msg_queue) > 0) { - debug3(" true, list_count = %d", list_count(out->msg_queue)); + if (t->msg != NULL || list_count(t->msg_queue) > 0) { + debug3(" true, list_count = %d", list_count(t->msg_queue)); return true; } - debug3(" false (list_count = %d)", list_count(out->msg_queue)); + debug3(" false (list_count = %d)", list_count(t->msg_queue)); return false; } static int _task_write(eio_obj_t *obj, List objs) { - struct task_in_info *in = (struct task_in_info *) obj->arg; - struct outgoing_fd_info *out; + struct task_write_info *in = (struct task_write_info *) obj->arg; void *buf; int n; debug2("Entering _task_write"); xassert(in->magic == TASK_IN_MAGIC); - out = &in->out; - /* * If we aren't already in the middle of sending a message, get the * next message from the queue. */ - if (out->msg == NULL) { - out->msg = list_dequeue(out->msg_queue); - if (out->msg == NULL) { + if (in->msg == NULL) { + in->msg = list_dequeue(in->msg_queue); + if (in->msg == NULL) { debug3("_task_write: nothing in the queue"); return SLURM_SUCCESS; } - if (out->msg->length == 0) { /* eof message */ + if (in->msg->length == 0) { /* eof message */ close(obj->fd); obj->fd = -1; - _free_incoming_msg(out->msg, in->job); - out->msg = NULL; + _free_incoming_msg(in->msg, in->job); + in->msg = NULL; return SLURM_SUCCESS; } - out->remaining = out->msg->length; + in->remaining = in->msg->length; } /* * Write message to socket. */ - buf = out->msg->data + (out->msg->length - out->remaining); + buf = in->msg->data + (in->msg->length - in->remaining); again: - if ((n = write(obj->fd, buf, out->remaining)) < 0) { + if ((n = write(obj->fd, buf, in->remaining)) < 0) { if (errno == EINTR) goto again; /* FIXME handle error */ return SLURM_ERROR; } - out->remaining -= n; - if (out->remaining > 0) + in->remaining -= n; + if (in->remaining > 0) return SLURM_SUCCESS; - _free_incoming_msg(out->msg, in->job); - out->msg = NULL; + _free_incoming_msg(in->msg, in->job); + in->msg = NULL; return SLURM_SUCCESS; } @@ -482,10 +503,10 @@ static eio_obj_t * _create_task_out_eio(int fd, uint16_t type, slurmd_job_t *job, slurmd_task_info_t *task) { - struct task_out_info *out = NULL; + struct task_read_info *out = NULL; eio_obj_t *eio = NULL; - out = (struct task_out_info *)xmalloc(sizeof(struct task_out_info)); + out = (struct task_read_info *)xmalloc(sizeof(struct task_read_info)); #ifndef NDEBUG out->magic = TASK_OUT_MAGIC; #endif @@ -499,7 +520,7 @@ _create_task_out_eio(int fd, uint16_t type, if (cbuf_opt_set(out->buf, CBUF_OPT_OVERWRITE, CBUF_NO_DROP) == -1) error("setting cbuf options"); - eio = eio_obj_create(fd, &task_out_ops, (void *)out); + eio = eio_obj_create(fd, &task_read_ops, (void *)out); return eio; } @@ -507,7 +528,7 @@ _create_task_out_eio(int fd, uint16_t type, static bool _task_readable(eio_obj_t *obj) { - struct task_out_info *out = (struct task_out_info *)obj->arg; + struct task_read_info *out = (struct task_read_info *)obj->arg; debug3("Called _task_readable, task %d, %s", out->gtaskid, out->type == SLURM_IO_STDOUT ? "STDOUT" : "STDERR"); @@ -533,7 +554,7 @@ _task_readable(eio_obj_t *obj) static int _task_read(eio_obj_t *obj, List objs) { - struct task_out_info *out = (struct task_out_info *)obj->arg; + struct task_read_info *out = (struct task_read_info *)obj->arg; struct client_io_info *client; struct io_buf *msg = NULL; eio_obj_t *eio; @@ -552,7 +573,7 @@ again: if (errno == EINTR) goto again; if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) { - error("_task_read returned EAGAIN"); + debug3("_task_read returned EAGAIN"); return SLURM_SUCCESS; } /* FIXME add error message */ @@ -736,10 +757,29 @@ _xclose(int fd) return rc; } +void +_shrink_msg_cache(List cache, slurmd_job_t *job) +{ + struct io_buf *msg; + int over = 0; + int count; + int i; + + count = list_count(cache); + if (count > STDIO_MAX_MSG_CACHE) + over = count - STDIO_MAX_MSG_CACHE; + + for (i = 0; i < over; i++) { + msg = list_dequeue(cache); + /* FIXME - following call MIGHT lead to too much recursion */ + _free_outgoing_msg(msg, job); + } +} + static void _route_msg_task_to_client(eio_obj_t *obj) { - struct task_out_info *out = (struct task_out_info *)obj->arg; + struct task_read_info *out = (struct task_read_info *)obj->arg; struct client_io_info *client; struct io_buf *msg = NULL; eio_obj_t *eio; @@ -759,12 +799,20 @@ _route_msg_task_to_client(eio_obj_t *obj) clients = list_iterator_create(out->job->clients); while(eio = list_next(clients)) { client = (struct client_io_info *)eio->arg; + if (client->out_eof == true) + continue; debug3("======================== Enqueued message"); xassert(client->magic == CLIENT_IO_MAGIC); - if (list_enqueue(client->out.msg_queue, msg)) + if (list_enqueue(client->msg_queue, msg)) msg->ref_count++; } list_iterator_destroy(clients); + + /* Update the outgoing message cache */ + if (list_enqueue(out->job->outgoing_cache, msg)) { + msg->ref_count++; + _shrink_msg_cache(out->job->outgoing_cache, out->job); + } } } @@ -813,6 +861,19 @@ _free_outgoing_msg(struct io_buf *msg, slurmd_job_t *job) } } +static void +_free_all_outgoing_msgs(List msg_queue, slurmd_job_t *job) +{ + ListIterator msgs; + struct io_buf *msg; + + msgs = list_iterator_create(msg_queue); + while(msg = list_next(msgs)) { + _free_outgoing_msg(msg, job); + } + list_iterator_destroy(msgs); +} + extern void io_close_task_fds(slurmd_job_t *job) { @@ -855,12 +916,10 @@ _io_thr(void *arg) /* A SIGHUP signal signals a reattach to the mgr thread. We need * to block SIGHUP from being delivered to this thread so the mgr * thread will see the signal. - * - * FIXME! It is conceivable that a SIGHUP could be delivered to - * this thread before we get a chance to block it. */ sigemptyset(&set); sigaddset(&set, SIGHUP); + sigaddset(&set, SIGPIPE); pthread_sigmask(SIG_BLOCK, &set, NULL); debug("IO handler started pid=%lu", (unsigned long) getpid()); @@ -870,22 +929,21 @@ _io_thr(void *arg) } /* - * create initial client obj for this job step + * Initiate a TCP connection back to a waiting client (e.g. srun). + * + * Create a new eio client object and wake up the eio engine so that + * it can see the new object. */ int -io_client_connect(slurmd_job_t *job) +io_client_connect(srun_info_t *srun, slurmd_job_t *job) { int i; - srun_info_t *srun; int sock = -1; struct client_io_info *client; eio_obj_t *obj; debug2 ("adding IO connection (logical node rank %d)", job->nodeid); - srun = list_peek(job->sruns); - xassert(srun != NULL); - if (srun->ioaddr.sin_addr.s_addr) { char host[256]; uint16_t port; @@ -915,7 +973,7 @@ io_client_connect(slurmd_job_t *job) client->magic = CLIENT_IO_MAGIC; #endif client->job = job; - client->out.msg_queue = list_create(NULL); /* FIXME! Need desctructor */ + client->msg_queue = NULL; /* initialized in _client_writable */ obj = eio_obj_create(sock, &client_ops, (void *)client); list_append(job->clients, (void *)obj); @@ -929,15 +987,6 @@ io_client_connect(slurmd_job_t *job) return SLURM_SUCCESS; } -int -io_new_clients(slurmd_job_t *job) -{ - return SLURM_ERROR; -#if 0 - return io_prepare_clients(job); -#endif -} - static int _send_io_init_msg(int sock, srun_key_t *key, slurmd_job_t *job) { @@ -989,7 +1038,7 @@ io_dup_stdio(slurmd_task_info_t *t) } static void -_send_eof_msg(struct task_out_info *out) +_send_eof_msg(struct task_read_info *out) { struct client_io_info *client; struct io_buf *msg = NULL; @@ -1021,7 +1070,7 @@ _send_eof_msg(struct task_out_info *out) client = (struct client_io_info *)eio->arg; debug3("======================== Enqueued message"); xassert(client->magic == CLIENT_IO_MAGIC); - if (list_enqueue(client->out.msg_queue, msg)) + if (list_enqueue(client->msg_queue, msg)) msg->ref_count++; } list_iterator_destroy(clients); @@ -1032,7 +1081,7 @@ _send_eof_msg(struct task_out_info *out) static struct io_buf * -_task_build_message(struct task_out_info *out, slurmd_job_t *job, cbuf_t cbuf) +_task_build_message(struct task_read_info *out, slurmd_job_t *job, cbuf_t cbuf) { struct io_buf *msg; char *ptr; diff --git a/src/slurmd/io.h b/src/slurmd/io.h index 2de8e7c780a1bca09d4d2f5a173eb9145ae48d59..4390ec4745981db88d2ae0a2c37968e1c588c3b0 100644 --- a/src/slurmd/io.h +++ b/src/slurmd/io.h @@ -31,6 +31,13 @@ #include "src/slurmd/slurmd_job.h" #include "src/common/eio.h" +/* + * The message cache uses up free message buffers, so STDIO_MAX_MSG_CACHE + * must be a number smaller than STDIO_MAX_FREE_BUF. + */ +#define STDIO_MAX_FREE_BUF 10 +#define STDIO_MAX_MSG_CACHE 5 + struct io_buf { int ref_count; uint32_t length; @@ -40,6 +47,14 @@ struct io_buf { struct io_buf *alloc_io_buf(void); void free_io_buf(struct io_buf *buf); +/* + * Initiate a TCP connection back to a waiting client (e.g. srun). + * + * Create a new eio client object and wake up the eio engine so that + * it can see the new object. + */ +int io_client_connect(srun_info_t *srun, slurmd_job_t *job); + /* * Initialize each task's standard I/O file descriptors. The file descriptors * may be files, or may be the end of a pipe which is handled by an eio_obj_t. @@ -54,13 +69,6 @@ int io_init_tasks_stdio(slurmd_job_t *job); */ int io_thread_start(slurmd_job_t *job); -/* - * Create a set of new connecting clients for the running job - * Grabs the latest srun object off the job's list of attached - * sruns, and duplicates stdout/err to this new client. - */ -int io_new_clients(slurmd_job_t *job); - int io_dup_stdio(slurmd_task_info_t *t); /* diff --git a/src/slurmd/mgr.c b/src/slurmd/mgr.c index 52b2a0f20c16f845acbccfe7bf95ba68671a605f..2d271782783bb89b67e8d7bf20f9aa499dc30d06 100644 --- a/src/slurmd/mgr.c +++ b/src/slurmd/mgr.c @@ -456,8 +456,11 @@ _setup_io(slurmd_job_t *job) */ _slurmd_job_log_init(job); - if (!job->batch) - rc = io_client_connect(job); + if (!job->batch) { + srun_info_t *srun = list_peek(job->sruns); + xassert(srun != NULL); + rc = io_client_connect(srun, job); + } #ifndef NDEBUG # ifdef PR_SET_DUMPABLE @@ -580,9 +583,6 @@ _job_mgr(slurmd_job_t *job) goto fail1; } -/* xsignal_block(mgr_sigarray); */ -/* xsignal(SIGHUP, _hup_handler); */ - if (job->spawn_task) rc = _setup_spawn_io(job); else @@ -1198,6 +1198,7 @@ static void _handle_attach_req(slurmd_job_t *job) { srun_info_t *srun; + int rc; debug("handling attach request for %u.%u", job->jobid, job->stepid); @@ -1214,7 +1215,9 @@ _handle_attach_req(slurmd_job_t *job) list_prepend(job->sruns, (void *) srun); - io_new_clients(job); + rc = io_client_connect(srun, job); + if (rc == SLURM_ERROR) + error("Failed attaching new stdio client"); } diff --git a/src/slurmd/slurmd_job.c b/src/slurmd/slurmd_job.c index 5f24262e451c9fc01b282c03cbbdf38bccb4e989..77289a730dbb40ff041f9f9ec48b645948308454 100644 --- a/src/slurmd/slurmd_job.c +++ b/src/slurmd/slurmd_job.c @@ -179,13 +179,14 @@ job_create(launch_tasks_request_msg_t *msg, slurm_addr *cli_addr) job->stdout_eio_objs = list_create(NULL); /* FIXME! Needs destructor */ job->stderr_eio_objs = list_create(NULL); /* FIXME! Needs destructor */ job->free_incoming = list_create(NULL); /* FIXME! Needs destructor */ - for (i = 0; i < 10; i++) { + for (i = 0; i < STDIO_MAX_FREE_BUF; i++) { list_enqueue(job->free_incoming, alloc_io_buf()); } job->free_outgoing = list_create(NULL); /* FIXME! Needs destructor */ - for (i = 0; i < 10; i++) { + for (i = 0; i < STDIO_MAX_FREE_BUF; i++) { list_enqueue(job->free_outgoing, alloc_io_buf()); } + job->outgoing_cache = list_create(NULL); /* FIXME! Needs destructor */ job->envtp = xmalloc(sizeof(env_t)); job->envtp->jobid = -1; diff --git a/src/slurmd/slurmd_job.h b/src/slurmd/slurmd_job.h index 8b77520eb5b0e194f4bc8ac0db69c98132df8d4f..5f8534d75d1a35d16eaea907635d2c542a2014e6 100644 --- a/src/slurmd/slurmd_job.h +++ b/src/slurmd/slurmd_job.h @@ -138,6 +138,10 @@ typedef struct slurmd_job { * traffic "outgoing" means traffic from the * tasks to srun. */ + List outgoing_cache; /* cache of outgoing stdio messages + * used when a new client attaches + */ + uint8_t buffered_stdio; /* stdio buffering flag, 1 for line-buffering, * 0 for no buffering */