From ebbea5a7a6591b6aee9c8eea7549616e93541a04 Mon Sep 17 00:00:00 2001 From: Mark Grondona <mgrondona@llnl.gov> Date: Mon, 23 Dec 2002 22:59:17 +0000 Subject: [PATCH] o common/eio.c : add close handler to io objects, call close handler on POLLHUP but no read method. o common/log.c : add pthread_atfork() calls to ensure sanity of log data structure after a fork() from a threaded program. o slurmd/io.c : set _obj_close as close handler for all objects o slurmd/shm.c : fix bug that caused lockfile to always go into /tmp o slurmd/mgr.c : reset log prefix to denote job id when logging to a file o srun/srun.c : do not call fatal() after having allocated nodes, use die() instead which prints an error then does goto cleanup; o srun/*.c : remove _debug vestigal var --- src/common/eio.c | 4 ++++ src/common/eio.h | 1 + src/common/log.c | 25 +++++++++++++++++++++---- src/slurmd/io.c | 28 +++++++++++++++++++++++----- src/slurmd/mgr.c | 4 ++++ src/slurmd/req.c | 3 +-- src/slurmd/shm.c | 15 +++++++++------ src/srun/io.c | 16 +++++++++++----- src/srun/io.h | 1 + src/srun/launch.c | 2 +- src/srun/msg.c | 5 +++-- src/srun/opt.c | 2 +- src/srun/opt.h | 1 - src/srun/srun.c | 37 ++++++++++++++++++++++++------------- 14 files changed, 104 insertions(+), 40 deletions(-) diff --git a/src/common/eio.c b/src/common/eio.c index 90ee0af120e..90083239ea5 100644 --- a/src/common/eio.c +++ b/src/common/eio.c @@ -179,10 +179,14 @@ _poll_handle_event(short revents, io_obj_t *obj, List objList) if ((*obj->ops->handle_error) (obj, objList) < 0) return; } + if (((revents & POLLIN) || (revents & POLLHUP)) && obj->ops->handle_read ) { (*obj->ops->handle_read ) (obj, objList); + } else if ((revents & POLLHUP) && obj->ops->handle_close) { + (*obj->ops->handle_close) (obj, objList); } + if ((revents & POLLOUT) && obj->ops->handle_write) { (*obj->ops->handle_write) (obj, objList); } diff --git a/src/common/eio.h b/src/common/eio.h index 3bab96e0722..e71c6981ae5 100644 --- a/src/common/eio.h +++ b/src/common/eio.h @@ -44,6 +44,7 @@ struct io_operations { int (*handle_read )(io_obj_t *, List); int (*handle_write)(io_obj_t *, List); int (*handle_error)(io_obj_t *, List); + int (*handle_close)(io_obj_t *, List); }; struct io_obj { diff --git a/src/common/log.c b/src/common/log.c index f8201a06471..1f197cba651 100644 --- a/src/common/log.c +++ b/src/common/log.c @@ -114,6 +114,22 @@ extern char * program_invocation_short_name; # define default_argv0 "" #endif + +/* + * pthread_atfork handlers: + */ +#ifdef WITH_PTHREADS +static void _atfork_prep() { if (log) slurm_mutex_lock(&log_lock); } +static void _atfork_parent() { if (log) slurm_mutex_unlock(&log_lock); } +static void _atfork_child() { if (log) slurm_mutex_init(&log_lock); } +# define atfork_install_handlers() \ + do { \ + pthread_atfork(_atfork_prep, _atfork_parent, _atfork_child); \ + } while (0) +#else +# define atfork_install_handlers() (NULL) +#endif + /* * Initialize log with * prog = program name to tag error messages with @@ -134,6 +150,7 @@ _log_init(char *prog, log_options_t opt, log_facility_t fac, char *logfile ) log->buf = NULL; log->fbuf = NULL; log->fpfx = NULL; + atfork_install_handlers(); } if (prog) { @@ -318,13 +335,13 @@ static char *vxstrfmt(const char *fmt, va_list ap) xslurm_strerrorcat(buf); break; - case 't': /* "%t" => locally preferred date/time */ + case 't': /* "%t" => locally preferred date/time*/ xstrftimecat(buf, "%x %X"); break; - case 'T': /* "%T" => "dd Mon yyyy hh:mm:ss off" */ + case 'T': /* "%T" => "dd Mon yyyy hh:mm:ss off" */ xstrftimecat(buf, "%a %d %b %Y %H:%M:%S %z"); break; - case 'M': /* "%M" => "Mon DD hh:mm:ss" */ + case 'M': /* "%M" => "Mon DD hh:mm:ss" */ xstrftimecat(buf, "%b %d %T"); break; case 's': /* "%s" => append string */ @@ -523,7 +540,7 @@ log_has_data() bool rc = false; slurm_mutex_lock(&log_lock); if (log->opt.buffered) - rc = (cbuf_used(log->buf) > 0); + rc = (cbuf_used(log->buf) > 0); slurm_mutex_unlock(&log_lock); return rc; } diff --git a/src/slurmd/io.c b/src/slurmd/io.c index a04e48168f4..c910c451c02 100644 --- a/src/slurmd/io.c +++ b/src/slurmd/io.c @@ -124,7 +124,6 @@ static int find_obj(void *obj, void *key); /* static int find_fd(void *obj, void *key); */ static bool _isa_client(struct io_info *io); static bool _isa_task(struct io_info *io); -static void _obj_close(io_obj_t *obj, List objs); static void _io_client_attach(io_obj_t *, io_obj_t *, io_obj_t *, List objList); static int _open_output_file(slurmd_job_t *job, task_info_t *t, @@ -155,6 +154,7 @@ static int _client_read(io_obj_t *, List); static int _task_error(io_obj_t *, List); static int _client_error(io_obj_t *, List); static int _connecting_write(io_obj_t *, List); +static int _obj_close(io_obj_t *, List); /* Task Output operations (TASK_STDOUT, TASK_STDERR) * These objects are never writable -- @@ -163,7 +163,8 @@ static int _connecting_write(io_obj_t *, List); struct io_operations task_out_ops = { readable: &_readable, handle_read: &_task_read, - handle_error: &_task_error + handle_error: &_task_error, + handle_close: &_obj_close }; /* Task Input operations (TASK_STDIN) @@ -173,6 +174,7 @@ struct io_operations task_in_ops = { writable: &_writable, handle_write: &_write, handle_error: &_task_error, + handle_close: &_obj_close }; /* Normal client operations (CLIENT_STDOUT, CLIENT_STDERR, CLIENT_STDIN) @@ -186,6 +188,7 @@ struct io_operations client_ops = { handle_read: &_client_read, handle_write: &_write, handle_error: &_client_error, + handle_close: &_obj_close }; @@ -198,7 +201,8 @@ struct io_operations client_ops = { struct io_operations connecting_client_ops = { writable: &_writable, handle_write: &_connecting_write, - handle_error: &_client_error + handle_error: &_client_error, + handle_close: &_obj_close }; /* @@ -303,6 +307,9 @@ io_close_all(slurmd_job_t *job) for (i = 0; i < job->ntasks; i++) _io_finalize(job->task[i]); + /* No more debug info will be recieved by client after this point + */ + debug("Closing debug channel"); close(STDERR_FILENO); /* Signal IO thread to close appropriate @@ -1087,7 +1094,7 @@ io_prepare_child(task_info_t *t) return SLURM_SUCCESS; } -static void +static int _obj_close(io_obj_t *obj, List objs) { struct io_info *io = (struct io_info *) obj->arg; @@ -1107,6 +1114,8 @@ _obj_close(io_obj_t *obj, List objs) _shutdown_task_obj(io); xassert(_validate_io_list(objs)); + + return SLURM_SUCCESS; } static bool @@ -1460,10 +1469,19 @@ static int _client_error(io_obj_t *obj, List objs) { struct io_info *io = (struct io_info *) obj->arg; + socklen_t size = sizeof(int); + int err = 0; xassert(io->magic == IO_MAGIC); - error("%s task %d", _io_str[io->type], io->id); + if (getsockopt(obj->fd, SOL_SOCKET, SO_ERROR, &err, &size) < 0) + error ("getsockopt: %m"); + + if (err) { + debug("task %d %s: poll error: %s", + io->id, _io_str[io->type], slurm_strerror(err)); + } + return 0; } diff --git a/src/slurmd/mgr.c b/src/slurmd/mgr.c index d5ccce5ee4d..f9f84a21989 100644 --- a/src/slurmd/mgr.c +++ b/src/slurmd/mgr.c @@ -242,6 +242,10 @@ mgr_launch_batch_job(batch_job_launch_msg_t *msg, slurm_addr *cli) int status = 0; slurmd_job_t *job; char *batchdir; + char buf[256]; + + snprintf(buf, sizeof(buf), "[%d]", msg->job_id); + log_set_fpfx(buf); /* New process, so must reinit shm */ if ((rc = shm_init()) < 0) diff --git a/src/slurmd/req.c b/src/slurmd/req.c index 4c9f821ee68..6d3d5184fbe 100644 --- a/src/slurmd/req.c +++ b/src/slurmd/req.c @@ -112,6 +112,7 @@ _launch_batch_job(batch_job_launch_msg_t *req, slurm_addr *cli) pid_t pid; int rc; + switch ((pid = fork())) { case -1: error("launch_tasks: fork: %m"); @@ -123,7 +124,6 @@ _launch_batch_job(batch_job_launch_msg_t *req, slurm_addr *cli) destroy_credential_state_list(conf->cred_state_list); slurm_destroy_ssl_key_ctx(&conf->vctx); slurm_ssl_destroy(); - log_reinit(); rc = mgr_launch_batch_job(req, cli); exit(rc); /* NOTREACHED */ @@ -155,7 +155,6 @@ _launch_tasks(launch_tasks_request_msg_t *req, slurm_addr *cli) destroy_credential_state_list(conf->cred_state_list); slurm_destroy_ssl_key_ctx(&conf->vctx); slurm_ssl_destroy(); - log_reinit(); rc = mgr_launch_tasks(req, cli); exit(rc); /* NOTREACHED */ diff --git a/src/slurmd/shm.c b/src/slurmd/shm.c index a8d6c0b2292..6e46648e094 100644 --- a/src/slurmd/shm.c +++ b/src/slurmd/shm.c @@ -276,7 +276,10 @@ _create_ipc_name(const char *name) #if defined(POSIX_IPC_PREFIX) && defined(HAVE_POSIX_SEMS) dir = POSIX_IPC_PREFIX; #else - if (!(dir = conf->spooldir) || !(dir = getenv("TMPDIR")) || !strlen(dir)) + dir = conf->spooldir; + if ( !(dir = conf->spooldir) + && !(strlen(dir)) + && !(dir = getenv("TMPDIR"))) dir = "/tmp"; #endif /* POSIX_IPC_PREFIX */ @@ -567,7 +570,7 @@ shm_update_step_addrs(uint32_t jobid, uint32_t stepid, job_step_t *s = &slurmd_shm->step[i]; /* Only allow one addr update at a time */ - if (1 || !s->io_update) { + if ( 1 || !s->io_update) { s->ioaddr = *ioaddr; s->respaddr = *respaddr; memcpy(s->key.data, keydata, SLURM_KEY_SIZE); @@ -890,19 +893,19 @@ _shm_lock_and_initialize() && (shm_lock != SEM_FAILED)) { /* we've already opened shared memory */ _shm_lock(); - if (attach_pid != getpid()) { - attach_pid = getpid(); - slurmd_shm->users++; - } + attach_pid = getpid(); + slurmd_shm->users++; _shm_unlock(); return SLURM_SUCCESS; } shm_lock = _sem_open(SHM_LOCKNAME, O_CREAT|O_EXCL, 0600, 0); + debug3("lockname is `%s'", lockname); if (shm_lock != SEM_FAILED) /* lock didn't exist. Create shmem */ return _shm_new(); else /* lock exists. Attach to shared memory */ return _shm_reopen(); + } static void diff --git a/src/srun/io.c b/src/srun/io.c index 85111d6c194..f6316417747 100644 --- a/src/srun/io.c +++ b/src/srun/io.c @@ -483,13 +483,13 @@ _fopen(char *filename) xassert(filename != NULL); if (!(fp = fopen(filename, "w"))) - error ("Unable to open file `%s' for writing: %m", filename); + error ("Unable to open `%s' for writing: %m", filename); return fp; } -static void -_open_streams(job_t *job) +int +open_streams(job_t *job) { if ((job->ifname->type != IO_PER_TASK) && job->ifname->name) job->stdinfd = _stdin_open(job->ifname->name); @@ -506,6 +506,10 @@ _open_streams(job_t *job) else job->errstream = stderr; + if (!job->outstream || !job->errstream || (job->stdinfd < 0)) + return -1; + + return 0; } @@ -542,11 +546,13 @@ io_thr_create(job_t *job) net_set_low_water(job->iofd[i], 140); } - _open_streams(job); + if (open_streams(job) < 0) { + return SLURM_ERROR; + } pthread_attr_init(&attr); if ((errno = pthread_create(&job->ioid, &attr, &io_thr, (void *) job))) - fatal("Unable to create io thread: %m"); + return SLURM_ERROR; debug("Started IO server thread (%d)", job->ioid); diff --git a/src/srun/io.h b/src/srun/io.h index 0b771bd4bbb..db32a656375 100644 --- a/src/srun/io.h +++ b/src/srun/io.h @@ -10,5 +10,6 @@ void *io_thr(void *arg); int io_thr_create(job_t *job); void report_job_status(job_t *job); void report_task_status(job_t *job); +int open_streams(job_t *job); #endif /* !_HAVE_IO_H */ diff --git a/src/srun/launch.c b/src/srun/launch.c index b382e56e6da..d009fbddb87 100644 --- a/src/srun/launch.c +++ b/src/srun/launch.c @@ -264,7 +264,7 @@ static void * _p_launch_task(void *args) int host_inx = msg_ptr->srun_node_id; int failure = 0; - if (_verbose || _debug) + if (_verbose) _print_launch_msg(msg_ptr, job_ptr->host[host_inx]); if (slurm_send_only_node_msg(req_ptr) < 0) { /* Has timeout */ error("task launch error on %s: %m", job_ptr->host[host_inx]); diff --git a/src/srun/msg.c b/src/srun/msg.c index 783317760a9..35d13f59ae9 100644 --- a/src/srun/msg.c +++ b/src/srun/msg.c @@ -308,8 +308,9 @@ _reattach_handler(job_t *job, slurm_msg_t *msg) job->jobid, slurm_strerror(resp->return_code)); update_job_state(job, SRUN_JOB_FAILED); } else { - error ("Unable to attach to step %d.%d on node %d", - job->jobid, job->stepid, resp->srun_node_id); + error ("Unable to attach to step %d.%d on node %d: %s", + job->jobid, job->stepid, resp->srun_node_id, + slurm_strerror(resp->return_code)); } } diff --git a/src/srun/opt.c b/src/srun/opt.c index db753701242..1cf1a38fc0b 100644 --- a/src/srun/opt.c +++ b/src/srun/opt.c @@ -1097,7 +1097,7 @@ void _opt_list() info("distribution : %s", format_distribution_t(opt.distribution)); info("core format : %s", opt.core_format); info("verbose : %d", _verbose); - info("debug : %d", _debug); + info("slurmd_debug : %d", opt.slurmd_debug); info("immediate : %s", tf_(opt.immediate)); info("label output : %s", tf_(opt.labelio)); info("allocate : %s", tf_(opt.allocate)); diff --git a/src/srun/opt.h b/src/srun/opt.h index acdde91a3ad..6a90bc44da6 100644 --- a/src/srun/opt.h +++ b/src/srun/opt.h @@ -56,7 +56,6 @@ /* global variables relating to user options */ char **remote_argv; int remote_argc; -int _debug; int _verbose; /* mutually exclusive modes for srun */ diff --git a/src/srun/srun.c b/src/srun/srun.c index fd8a2585592..f0e429f10c8 100644 --- a/src/srun/srun.c +++ b/src/srun/srun.c @@ -119,6 +119,11 @@ static void _p_fwd_signal(slurm_msg_t *req_array_ptr, job_t *job); static void *_p_signal_task(void *args); static int _set_batch_script_env(uint32_t jobid); +#define die(msg, args...) do { \ + error(msg, ##args); \ + goto cleanup; \ + } while (0) + #ifdef HAVE_LIBELAN3 # include "src/common/qsw.h" static void _qsw_standalone(job_t *job); @@ -151,9 +156,8 @@ srun(int ac, char **av) /* reinit log with new verbosity (if changed by command line) */ - if (_verbose || _debug) { - if (_verbose) - logopt.stderr_level+=_verbose; + if (_verbose) { + logopt.stderr_level+=_verbose; logopt.prefix_level = 1; log_alter(logopt, 0, NULL); } @@ -182,7 +186,7 @@ srun(int ac, char **av) } else if (opt.allocate) { if ( !(resp = _allocate_nodes()) ) exit(1); - if (_verbose || _debug) + if (_verbose) _print_job_information(resp); job = job_create_allocation(resp); _run_job_script(resp->job_id); @@ -198,7 +202,7 @@ srun(int ac, char **av) } else { if ( !(resp = _allocate_nodes()) ) exit(1); - if (_verbose || _debug) + if (_verbose) _print_job_information(resp); job = job_create_allocation(resp); @@ -213,7 +217,7 @@ srun(int ac, char **av) sigaddset(&sigset, SIGTSTP); sigaddset(&sigset, SIGSTOP); if (sigprocmask(SIG_BLOCK, &sigset, NULL) != 0) - fatal("sigprocmask: %m"); + die("sigprocmask: %m"); action.sa_handler = &_sigterm_handler; action.sa_flags = 0; sigaction(SIGTERM, &action, NULL); @@ -221,22 +225,22 @@ srun(int ac, char **av) /* job structure should now be filled in */ if (msg_thr_create(job) < 0) - fatal("Unable to create msg thread"); + die("Unable to create msg thread"); if (io_thr_create(job) < 0) - fatal("Unable to create IO thread"); + die("failed to initialize IO"); pthread_attr_init(&attr); /* spawn signal thread */ if ((errno = pthread_create(&job->sigid, &attr, &_sig_thr, (void *) job)) != 0) - fatal("Unable to create signal thread. %m"); + die("Unable to create signal thread. %m"); debug("Started signals thread (%d)", job->sigid); /* launch jobs */ if ((errno = pthread_create(&job->lid, &attr, &launch, (void *) job))) - fatal("Unable to create launch thread. %m"); + die("Unable to create launch thread. %m"); debug("Started launch thread (%d)", job->lid); /* wait for job to terminate */ @@ -268,6 +272,7 @@ srun(int ac, char **av) /* kill signal thread */ pthread_cancel(job->sigid); + cleanup: if (old_job) { debug("cancelling job step %u.%u", job->jobid, job->stepid); slurm_complete_job_step(job->jobid, job->stepid, 0, 0); @@ -631,7 +636,7 @@ static void _p_fwd_signal(slurm_msg_t *req_array_ptr, job_t *job) task_info_ptr->host_inx = i; if (pthread_attr_init (&thread_ptr[i].attr)) - fatal ("pthread_attr_init error %m"); + error ("pthread_attr_init error %m"); if (pthread_attr_setdetachstate (&thread_ptr[i].attr, PTHREAD_CREATE_DETACHED)) error ("pthread_attr_setdetachstate error %m"); @@ -968,6 +973,12 @@ _set_batch_script_env(uint32_t jobid) return -1; } + if ((opt.slurmd_debug) + && setenvf("SLURMD_DEBUG=%d", opt.slurmd_debug)) { + error("Can't set SLURMD_DEBUG environment variable"); + return -1; + } + return 0; } @@ -1005,12 +1016,12 @@ static void _run_job_script (uint32_t jobid) } /* spawn the shell with arguments (if any) */ - if (_verbose || _debug) + if (_verbose) info ("Spawning srun shell %s", shell); switch ( (child = fork()) ) { case -1: - fatal("Fork error %m"); + error ("Fork error %m"); case 0: execv(shell, remote_argv); fatal("exec error %m"); -- GitLab