From a7f1e740e4dd9820fbb794e88b13777afeb43319 Mon Sep 17 00:00:00 2001 From: Mark Grondona <mgrondona@llnl.gov> Date: Wed, 22 Oct 2003 00:20:02 +0000 Subject: [PATCH] o introduce new io signal mechanism in the form of and "eio handle" o replace calls of pthread_kill() to IO thread with eio_handle_signal() o Try to avoid having obj->ops == 0x0 --- src/common/eio.c | 94 ++++++++++++++++++++++++++++++++++++++++++++---- src/common/eio.h | 9 ++++- src/slurmd/io.c | 18 +++++----- src/slurmd/job.c | 1 + src/slurmd/job.h | 1 + 5 files changed, 107 insertions(+), 16 deletions(-) diff --git a/src/common/eio.c b/src/common/eio.c index d8540a59913..9c8548b8961 100644 --- a/src/common/eio.c +++ b/src/common/eio.c @@ -28,18 +28,30 @@ #endif #include <sys/poll.h> +#include <unistd.h> #include <errno.h> #include "src/common/xmalloc.h" #include "src/common/xassert.h" #include "src/common/log.h" #include "src/common/list.h" +#include "src/common/fd.h" #include "src/common/eio.h" + +struct eio_handle_components { +#ifndef NDEBUG +# define EIO_MAGIC 0xe1e10 + int magic; +#endif + int fds[2]; +}; + + /* Function prototypes */ -static int _poll_loop_internal(List objs); +static int _poll_loop_internal(eio_t eio, List objs); static int _poll_internal(struct pollfd *pfds, unsigned int nfds); static unsigned int _poll_setup_pollfds(struct pollfd *, io_obj_t **, List); static void _poll_dispatch(struct pollfd *, unsigned int, io_obj_t **, @@ -47,13 +59,63 @@ static void _poll_dispatch(struct pollfd *, unsigned int, io_obj_t **, static void _poll_handle_event(short revents, io_obj_t *obj, List objList); -int io_handle_events(List objs) +eio_t eio_handle_create(void) { - return _poll_loop_internal(objs); + eio_t eio = xmalloc(sizeof(*eio)); + + if (pipe(eio->fds) < 0) { + error ("eio_create: pipe: %m"); + eio_handle_destroy(eio); + return (NULL); + } + + fd_set_nonblocking(eio->fds[0]); + + xassert(eio->magic = EIO_MAGIC); + + return eio; +} + +void eio_handle_destroy(eio_t eio) +{ + xassert(eio != NULL); + xassert(eio->magic == EIO_MAGIC); + close(eio->fds[0]); + close(eio->fds[1]); + xassert(eio->magic = ~EIO_MAGIC); + xfree(eio); +} + +int eio_handle_signal(eio_t eio) +{ + char c = 0; + if (write(eio->fds[1], &c, sizeof(char)) != 1) + return error("eio_signal: write; %m"); + return 0; +} + +static int _eio_clear(eio_t eio) +{ + char buf[1024]; + int rc = 0; + + while ((rc = (read(eio->fds[0], buf, 1024)) > 0)) {;} + + if (rc < 0) return error("eio_clear: read: %m"); + + return 0; +} + +int io_handle_events(eio_t eio, List objs) +{ + xassert (eio != NULL); + xassert (eio->magic == EIO_MAGIC); + + return _poll_loop_internal(eio, objs); } static int -_poll_loop_internal(List objs) +_poll_loop_internal(eio_t eio, List objs) { int retval = 0; struct pollfd *pollfds = NULL; @@ -66,8 +128,8 @@ _poll_loop_internal(List objs) /* Alloc memory for pfds and map if needed */ if (maxnfds < (n = list_count(objs))) { maxnfds = n; - xrealloc(pollfds, maxnfds*sizeof(struct pollfd)); - xrealloc(map, maxnfds*sizeof(io_obj_t * )); + xrealloc(pollfds, (maxnfds+1) * sizeof(struct pollfd)); + xrealloc(map, maxnfds * sizeof(io_obj_t * )); /* * Note: xrealloc() also handles initial malloc */ @@ -75,13 +137,31 @@ _poll_loop_internal(List objs) debug3("eio: handling events for %d objects", list_count(objs)); + /* + * Clear any pending eio signals + */ + _eio_clear(eio); + if ((nfds = _poll_setup_pollfds(pollfds, map, objs)) <= 0) goto done; + /* + * Setup eio handle poll fd + */ + pollfds[nfds].fd = eio->fds[0]; + pollfds[nfds].events = POLLIN; + nfds++; + + xassert(nfds <= maxnfds + 1); + + if (_poll_internal(pollfds, nfds) < 0) goto error; - _poll_dispatch(pollfds, nfds, map, objs); + if (pollfds[nfds-1].revents & POLLIN) + _eio_clear(eio); + + _poll_dispatch(pollfds, nfds-1, map, objs); } error: retval = -1; diff --git a/src/common/eio.h b/src/common/eio.h index e71c6981ae5..60862faae65 100644 --- a/src/common/eio.h +++ b/src/common/eio.h @@ -32,6 +32,8 @@ typedef struct io_obj io_obj_t; +typedef struct eio_handle_components * eio_t; + /* Possible I/O operations on an I/O object * Each takes the io_obj being operated on as an argument * @@ -61,6 +63,11 @@ struct io_obj { * * returns -1 on error. */ -int io_handle_events(List io_obj_list); +int io_handle_events(eio_t eio, List io_obj_list); + + +eio_t eio_handle_create(void); +void eio_handle_destroy(eio_t eio); +int eio_handle_signal(eio_t eio); #endif /* !_EIO_H */ diff --git a/src/slurmd/io.c b/src/slurmd/io.c index 4d2a2401af8..2a89440518b 100644 --- a/src/slurmd/io.c +++ b/src/slurmd/io.c @@ -340,7 +340,7 @@ io_close_all(slurmd_job_t *job) /* Signal IO thread to close appropriate * client connections */ - pthread_kill(job->ioid, SIGHUP); + eio_handle_signal(job->eio); } static void @@ -401,7 +401,7 @@ _io_thr(void *arg) { slurmd_job_t *job = (slurmd_job_t *) arg; debug("IO handler started pid=%lu", (unsigned long) getpid()); - io_handle_events(job->objs); + io_handle_events(job->eio, job->objs); debug("IO handler exited"); _handle_unprocessed_output(job); return (void *)1; @@ -572,9 +572,7 @@ io_prepare_clients(slurmd_job_t *job) retval = SLURM_ERROR; /* kick IO thread */ - debug3("sending sighup to io thread id %ld", (long) job->ioid); - if (pthread_kill(job->ioid, SIGHUP) < 0) - error("pthread_kill: %m"); + eio_handle_signal(job->eio); } return retval; @@ -584,7 +582,7 @@ io_prepare_clients(slurmd_job_t *job) * Try to open stderr connection for errors */ _io_add_connecting(job, job->task[0], srun, CLIENT_STDERR); - pthread_kill(job->ioid, SIGHUP); + eio_handle_signal(job->eio); return SLURM_FAILURE; } @@ -672,6 +670,7 @@ _io_client_attach(io_obj_t *client, io_obj_t *writer, struct io_info *dst = reader ? reader->arg : NULL; struct io_info *cli = client->arg; struct io_info *io; + struct io_operations *opsptr = NULL; xassert((src != NULL) || (dst != NULL)); xassert((src == NULL) || (src->magic == IO_MAGIC)); @@ -717,8 +716,9 @@ _io_client_attach(io_obj_t *client, io_obj_t *writer, io->obj->fd = client->fd; io->disconnected = 0; - xfree(io->obj->ops); + opsptr = io->obj->ops; io->obj->ops = _ops_copy(client->ops); + xfree(opsptr); /* * Delete old client which is now an empty vessel @@ -1242,13 +1242,15 @@ static void _do_attach(struct io_info *io) { task_info_t *t; + struct io_operations *opsptr; xassert(io != NULL); xassert(io->magic == IO_MAGIC); xassert(_isa_client(io)); - xfree(io->obj->ops); + opsptr = io->obj->ops; io->obj->ops = _ops_copy(&client_ops); + xfree(opsptr); t = io->task; diff --git a/src/slurmd/job.c b/src/slurmd/job.c index b4cf17afdb0..bcb6726e17a 100644 --- a/src/slurmd/job.c +++ b/src/slurmd/job.c @@ -140,6 +140,7 @@ job_create(launch_tasks_request_msg_t *msg, slurm_addr *cli_addr) #endif job->objs = list_create((ListDelF) io_obj_destroy); + job->eio = eio_handle_create(); srun = srun_info_create(msg->cred, &resp_addr, &io_addr); diff --git a/src/slurmd/job.h b/src/slurmd/job.h index ca3076a463d..2cfedcb9f60 100644 --- a/src/slurmd/job.h +++ b/src/slurmd/job.h @@ -120,6 +120,7 @@ typedef struct slurmd_job { struct passwd *pwd; /* saved passwd struct for user job */ task_info_t **task; /* list of task information pointers */ + eio_t eio; List objs; /* list of IO objects */ List sruns; /* List of sruns */ -- GitLab