diff --git a/src/common/log.c b/src/common/log.c index 47dafb86e4eba5924f792e40e4df7594859154b4..f8201a064719e79a9669ac07f26cf0e4f9b44ad1 100644 --- a/src/common/log.c +++ b/src/common/log.c @@ -87,7 +87,10 @@ */ typedef struct { char *argv0; - FILE *logfp; + char *fpfx; /* optional prefix for logfile entries */ + FILE *logfp; /* log file pointer */ + cbuf_t buf; /* stderr data buffer */ + cbuf_t fbuf; /* logfile data buffer */ log_facility_t facility; log_options_t opt; unsigned initialized:1; @@ -97,7 +100,7 @@ typedef struct { #ifdef WITH_PTHREADS static pthread_mutex_t log_lock; #else - static int log_lock; + static int log_lock; #endif /* WITH_PTHREADS */ static log_t *log = NULL; @@ -128,6 +131,9 @@ _log_init(char *prog, log_options_t opt, log_facility_t fac, char *logfile ) log = (log_t *)xmalloc(sizeof(log_t)); log->logfp = NULL; log->argv0 = NULL; + log->buf = NULL; + log->fbuf = NULL; + log->fpfx = NULL; } if (prog) { @@ -138,8 +144,21 @@ _log_init(char *prog, log_options_t opt, log_facility_t fac, char *logfile ) log->argv0 = xstrdup(default_argv0); } + if (!log->fpfx) + log->fpfx = xstrdup(""); + log->opt = opt; + if (log->opt.buffered) { + log->buf = cbuf_create(128, 8192); + log->fbuf = cbuf_create(128, 8192); + } else { + if (log->buf) + cbuf_destroy(log->buf); + if (log->fbuf) + cbuf_destroy(log->fbuf); + } + if (log->opt.syslog_level > LOG_LEVEL_QUIET) log->facility = fac; @@ -153,7 +172,7 @@ _log_init(char *prog, log_options_t opt, log_facility_t fac, char *logfile ) slurm_mutex_unlock(&log_lock); xslurm_strerrorcat(errmsg); fprintf(stderr, - "%s: log_init(): Unable to open logfile" + "%s: log_init(): Unable to open logfile" "`%s': %s\n", prog, logfile, errmsg); xfree(errmsg); rc = errno; @@ -181,11 +200,45 @@ int log_init(char *prog, log_options_t opt, log_facility_t fac, char *logfile) return rc; } +void log_fini() +{ + log_flush(); + slurm_mutex_lock(&log_lock); + if (log->argv0) + xfree(log->argv0); + if (log->fpfx) + xfree(log->argv0); + if (log->logfp) + fclose(log->logfp); + if (log->buf) + cbuf_destroy(log->buf); + if (log->fbuf) + cbuf_destroy(log->fbuf); + xfree(log); + log = NULL; + slurm_mutex_unlock(&log_lock); + slurm_mutex_destroy(&log_lock); +} + void log_reinit() { slurm_mutex_init(&log_lock); } +void log_set_fpfx(char *prefix) +{ + slurm_mutex_lock(&log_lock); + if (log->fpfx) + xfree(log->fpfx); + if (!prefix) + log->fpfx = xstrdup(""); + else { + log->fpfx = xstrdup(prefix); + xstrcatchar(log->fpfx, ' '); + } + slurm_mutex_unlock(&log_lock); +} + /* reinitialize log data structures. Like log_init, but do not init * the log mutex */ @@ -337,6 +390,26 @@ static void xlogfmtcat(char **dst, const char *fmt, ...) } +static void +_log_printf(cbuf_t cb, FILE *stream, const char *fmt, ...) +{ + va_list ap; + + va_start(ap, fmt); + if (log->opt.buffered && (cb != NULL)) { + char *buf = vxstrfmt(fmt, ap); + int len = strlen(buf); + int dropped; + cbuf_write(cb, buf, len, &dropped); + cbuf_read_to_fd(cb, fileno(stream), -1); + xfree(buf); + } else { + vfprintf(stream, fmt, ap); + } + va_end(ap); + +} + /* * log a message at the specified level to facilities that have been * configured to receive messages at that level @@ -381,7 +454,7 @@ static void log_msg(log_level_t level, const char *fmt, va_list args) case LOG_LEVEL_DEBUG: priority = LOG_DEBUG; - pfx = "debug: "; + pfx = "debug : "; break; case LOG_LEVEL_DEBUG2: @@ -408,19 +481,22 @@ static void log_msg(log_level_t level, const char *fmt, va_list args) if (level <= log->opt.stderr_level) { fflush(stdout); if (strlen(buf) > 0 && buf[strlen(buf) - 1] == '\n') - fprintf(stderr, "%s: %s%s", log->argv0, pfx, buf); + _log_printf( log->buf, stderr, "%s: %s%s", + log->argv0, pfx, buf); else - fprintf(stderr, "%s: %s%s\n", log->argv0, pfx, buf); + _log_printf( log->buf, stderr, "%s: %s%s\n", + log->argv0, pfx, buf); fflush(stderr); } if (level <= log->opt.logfile_level && log->logfp != NULL) { - xlogfmtcat(&msgbuf, "[%M] %s%s", pfx, buf); + xlogfmtcat(&msgbuf, "[%M] %s%s%s", + log->fpfx, pfx, buf); if (strlen(buf) > 0 && buf[strlen(buf) - 1] == '\n') - fprintf(log->logfp, "%s", msgbuf); + _log_printf(log->fbuf, log->logfp, "%s", msgbuf); else - fprintf(log->logfp, "%s\n", msgbuf); + _log_printf(log->fbuf, log->logfp, "%s\n", msgbuf); fflush(log->logfp); xfree(msgbuf); @@ -441,6 +517,34 @@ static void log_msg(log_level_t level, const char *fmt, va_list args) xfree(buf); } +bool +log_has_data() +{ + bool rc = false; + slurm_mutex_lock(&log_lock); + if (log->opt.buffered) + rc = (cbuf_used(log->buf) > 0); + slurm_mutex_unlock(&log_lock); + return rc; +} + +void +log_flush() +{ + slurm_mutex_lock(&log_lock); + + if (!log->opt.buffered) + goto done; + + if (log->opt.stderr_level) + cbuf_read_to_fd(log->buf, fileno(stderr), -1); + else if (log->logfp) + cbuf_read_to_fd(log->buf, fileno(log->logfp), -1); + + done: + slurm_mutex_unlock(&log_lock); +} + /* LLNL Software development Toolbox (LSD-Tools) * fatal() and nomem() functions */ @@ -468,10 +572,13 @@ void fatal(const char *fmt, ...) va_start(ap, fmt); log_msg(LOG_LEVEL_FATAL, fmt, ap); va_end(ap); + log_flush(); fatal_cleanup(); -#ifndef NDEBUG +#ifndef NDEBUG abort(); +#else + exit(1); #endif } @@ -675,3 +782,4 @@ dump_cleanup_list(void) } slurm_mutex_unlock(&fatal_lock); } + diff --git a/src/common/log.h b/src/common/log.h index 58b8e672be7b45d9baec80d3e4a69a9024649398..a375dfca2567f84f070d1ed903eb2674d153b4ff 100644 --- a/src/common/log.h +++ b/src/common/log.h @@ -49,6 +49,7 @@ #include <stdio.h> #include "src/common/macros.h" +#include "src/common/cbuf.h" /* supported syslog facilities and levels */ typedef enum { @@ -73,7 +74,7 @@ typedef enum { * QUIET disable logging completely. */ typedef enum { - LOG_LEVEL_QUIET, + LOG_LEVEL_QUIET = 0, LOG_LEVEL_FATAL, LOG_LEVEL_ERROR, LOG_LEVEL_INFO, @@ -88,22 +89,23 @@ typedef enum { * log options: Each of stderr, syslog, and logfile can have a different level */ typedef struct { - unsigned prefix_level; /* prefix level (e.g. "debug: ") if 1 */ log_level_t stderr_level; /* max level to log to stderr */ log_level_t syslog_level; /* max level to log to syslog */ log_level_t logfile_level; /* max level to log to logfile */ + unsigned prefix_level:1; /* prefix level (e.g. "debug: ") if 1 */ + unsigned buffered:1; /* Use internal buffer to never block */ } log_options_t; /* some useful initializers for log_options_t */ #define LOG_OPTS_INITIALIZER \ - { 1, LOG_LEVEL_INFO, LOG_LEVEL_INFO, LOG_LEVEL_INFO } + { LOG_LEVEL_INFO, LOG_LEVEL_INFO, LOG_LEVEL_INFO, 1, 0 } #define LOG_OPTS_SYSLOG_DEFAULT \ - { 1, LOG_LEVEL_QUIET, LOG_LEVEL_INFO, LOG_LEVEL_QUIET } + { LOG_LEVEL_QUIET, LOG_LEVEL_INFO, LOG_LEVEL_QUIET, 1, 0 } #define LOG_OPTS_STDERR_ONLY \ - { 1, LOG_LEVEL_INFO, LOG_LEVEL_QUIET, LOG_LEVEL_QUIET } + { LOG_LEVEL_INFO, LOG_LEVEL_QUIET, LOG_LEVEL_QUIET, 1, 0 } /* * initialize log module (called only once) @@ -131,6 +133,11 @@ int log_init(char *argv0, log_options_t opts, */ void log_reinit(void); +/* + * Close log and free associated memory + */ +void log_fini(void); + /* Alter log facility, options are like log_init() above, except that * an argv0 argument is not passed. * @@ -138,11 +145,30 @@ void log_reinit(void); */ int log_alter(log_options_t opts, log_facility_t fac, char *logfile); +/* Set prefix for log file entries + * (really only useful for slurmd at this point) + */ +void log_set_fpfx(char *pfx); + /* grab the FILE * of the current logfile (or stderr if not logging to * a file) */ FILE *log_fp(void); +/* + * Buffered log functions: + * + * log_has_data() returns true if there is data in the + * internal log buffer + */ +bool log_has_data(void); + +/* + * log_flush() attempts to flush all data in the internal + * log buffer to the appropriate output stream. + */ +void log_flush(void); + /* * the following log a message to the log facility at the appropriate level: * diff --git a/src/common/slurm_errno.c b/src/common/slurm_errno.c index 02899b6e4fb074cb6325ae0094529ffe084147cb..ca7bec06c22dd80a32163be1da41d9fe1796e296 100644 --- a/src/common/slurm_errno.c +++ b/src/common/slurm_errno.c @@ -221,11 +221,11 @@ static slurm_errtab_t slurm_errtab[] = { { ESLURMD_SHARED_MEMORY_ERROR, "Slurmd shared memory error" }, { ESLURMD_SET_UID_OR_GID_ERROR, - "Slurmd could not set UID or GID for batch job" }, + "Slurmd could not set UID or GID" }, { ESLURMD_SET_SID_ERROR, - "Slurmd could not set session ID for batch job" }, + "Slurmd could not set session ID" }, { ESLURMD_CANNOT_SPAWN_IO_THREAD, - "Slurmd could not spawn I/O thread handler" }, + "Slurmd could not spawn I/O thread" }, { ESLURMD_FORK_FAILED, "Slurmd could not fork batch job" }, { ESLURMD_EXECVE_FAILED, diff --git a/src/slurmctld/controller.c b/src/slurmctld/controller.c index 1f2b19dcf5300941128b5e14bc1c5487f030c6e1..e3e64cbe394c071bfeab62d94297c6721d5120e0 100644 --- a/src/slurmctld/controller.c +++ b/src/slurmctld/controller.c @@ -74,9 +74,8 @@ #endif /* !MAX */ /* Log to stderr and syslog until becomes a daemon */ -log_options_t log_opts = { 1, LOG_LEVEL_INFO, LOG_LEVEL_INFO, - LOG_LEVEL_QUIET -}; +log_options_t log_opts = + { LOG_LEVEL_INFO, LOG_LEVEL_INFO, LOG_LEVEL_QUIET, 1, 0 }; /* Global variables */ slurm_ctl_conf_t slurmctld_conf; diff --git a/src/slurmd/elan_interconnect.c b/src/slurmd/elan_interconnect.c index 4312d18204a2abc5239d20686d0b2e890e6424f7..b0d0911258c13f90b613537359c4561ac0b60550 100644 --- a/src/slurmd/elan_interconnect.c +++ b/src/slurmd/elan_interconnect.c @@ -76,6 +76,8 @@ _wait_and_destroy_prg(qsw_jobinfo_t qsw_job, pid_t pid) sleep(sleeptime*=2); } + debug("destroyed program description"); + exit(0); return SLURM_SUCCESS; } @@ -97,7 +99,8 @@ interconnect_init(slurmd_job_t *job) case 0: /* child falls thru */ break; default: /* parent */ - return _wait_and_destroy_prg(job->qsw_job, pid); + _wait_and_destroy_prg(job->qsw_job, pid); + /*NOTREACHED*/ } /* Process 2: */ diff --git a/src/slurmd/io.c b/src/slurmd/io.c index 5d7117ccca44695f3d83c876ce34a96ab72a5700..a04e48168f4acdc8b197111c1a8a918a816d47ed 100644 --- a/src/slurmd/io.c +++ b/src/slurmd/io.c @@ -112,6 +112,7 @@ struct io_info { static bool _isa_client(struct io_info *io); static bool _isa_task(struct io_info *io); +static void _fatal_cleanup(void *); static int _io_init_pipes(task_info_t *t); static int _io_prepare_tasks(slurmd_job_t *); static void * _io_thr(void *); @@ -235,6 +236,8 @@ io_spawn_handler(slurmd_job_t *job) pthread_create(&job->ioid, &attr, &_io_thr, (void *)job); + fatal_add_cleanup(&_fatal_cleanup, (void *) job); + return 0; } @@ -242,13 +245,13 @@ static int _xclose(int fd) { int rc; - do rc = close(fd); - while (rc == -1 && errno == EINTR); + do { + rc = close(fd); + } while (rc == -1 && errno == EINTR); return rc; } - /* * Close child fds in parent as well as * any stdin io objs in job->objs @@ -308,6 +311,27 @@ io_close_all(slurmd_job_t *job) pthread_kill(job->ioid, SIGHUP); } +static void +_fatal_cleanup(void *arg) +{ + slurmd_job_t *job = (slurmd_job_t *) arg; + ListIterator i; + io_obj_t *obj; + struct io_info *io; + + error("in fatal_cleanup"); + + _task_read(job->task[0]->err, job->objs); + + i = list_iterator_create(job->objs); + while((obj = list_next(i))) { + io = (struct io_info *) obj->arg; + if ((*obj->ops->writable)(obj)) + _write(obj, job->objs); + } + list_iterator_destroy(i); +} + static void _handle_unprocessed_output(slurmd_job_t *job) { @@ -326,9 +350,8 @@ _handle_unprocessed_output(slurmd_job_t *job) continue; if (io->buf && (n = cbuf_used(io->buf))) - job_error(job, - "task %d: %ld bytes of stdout unprocessed", - io->id, n); + error("task %d: %ld bytes of stdout unprocessed", + io->id, n); if (!(readers = ((struct io_info *)t->err->arg)->readers)) continue; @@ -336,7 +359,8 @@ _handle_unprocessed_output(slurmd_job_t *job) continue; if (io->buf && (n = cbuf_used(io->buf))) - job_error(job, "%ld bytes of stderr unprocessed", n); + error("task %d: %ld bytes of stderr unprocessed", + io->id, n); } } @@ -1114,6 +1138,9 @@ _writable(io_obj_t *obj) && !io->disconnected && ((cbuf_used(io->buf) > 0) || io->eof)); + if ((io->type == CLIENT_STDERR) && (io->id == 0)) + rc = (rc || log_has_data()); + if (rc) debug3("%d %s is writable", io->id, _io_str[io->type]); @@ -1132,6 +1159,9 @@ _write(io_obj_t *obj, List objs) if (io->disconnected) return 0; + if (io->id == 0) + log_flush(); + debug3("Need to write %ld bytes to %s %d", cbuf_used(io->buf), _io_str[io->type], io->id); diff --git a/src/slurmd/job.h b/src/slurmd/job.h index 199b790313e1b25c6d46de1c317e700aa762b14a..f8f51ab076bb317ec368e069ebbb7c749f5ef496 100644 --- a/src/slurmd/job.h +++ b/src/slurmd/job.h @@ -129,34 +129,4 @@ void job_update_shm(slurmd_job_t *job); void job_delete_shm(slurmd_job_t *job); -#define job_error(j, fmt, args...) \ - do { \ - error("%d.%d: " fmt, (j)->jobid, (j)->stepid, ## args); \ - } while (0) - -#define job_verbose(j, fmt, args...) \ - do { \ - verbose("%d.%d: " fmt, (j)->jobid, (j)->stepid, ## args); \ - } while (0) - -#define job_debug(j, fmt, args...) \ - do { \ - debug("%d.%d: " fmt, (j)->jobid, (j)->stepid, ## args); \ - } while (0) - -#define job_debug2(j, fmt, args...) \ - do { \ - debug2("%d.%d: " fmt, (j)->jobid, (j)->stepid, ## args); \ - } while (0) - -#define job_debug3(j, fmt, args...) \ - do { \ - debug3("%d.%d: " fmt, (j)->jobid, (j)->stepid, ## args); \ - } while (0) - -#define job_info(j, fmt, args...) \ - do { \ - info("%d.%d: " fmt, (j)->jobid, (j)->stepid, ## args); \ - } while (0) - #endif /* !_JOB_H */ diff --git a/src/slurmd/mgr.c b/src/slurmd/mgr.c index 27290ed5b32157bf240176652d11abb895670173..e27079a1a0137fb4a6880785e51dcf73d50717cd 100644 --- a/src/slurmd/mgr.c +++ b/src/slurmd/mgr.c @@ -48,6 +48,7 @@ #include "src/common/cbuf.h" #include "src/common/hostlist.h" #include "src/common/log.h" +#include "src/common/fd.h" #include "src/common/safeopen.h" #include "src/common/slurm_errno.h" #include "src/common/xsignal.h" @@ -69,6 +70,7 @@ static int _drop_privileges(struct passwd *pwd); static int _reclaim_privileges(struct passwd *pwd); static int _become_user(slurmd_job_t *job); static int _unblock_all_signals(void); +static int _block_most_signals(void); static int _send_exit_msg(int rc, task_info_t *t); static int _complete_job(slurmd_job_t *job, int rc, int status); static void _send_launch_resp(slurmd_job_t *job, int rc); @@ -80,21 +82,25 @@ _setargs(slurmd_job_t *job, char **argv, int argc) { int i; size_t len = 0; - char *name = NULL; + char *arg = NULL; - for (i = 1; i < argc; i++) - len += strlen(argv[0]) + 1; + for (i = 0; i < argc; i++) + len += strlen(argv[i]) + 1; - xstrfmtcat(name, "slurmd [%d.%d]", job->jobid, job->stepid); + if (job->stepid == NO_VAL) + xstrfmtcat(arg, "[%d]", job->jobid); + else + xstrfmtcat(arg, "[%d.%d]", job->jobid, job->stepid); - if (len < strlen(name)) + if (len < (strlen(arg) + 7)) goto done; memset(argv[0], 0, len); - strncpy(argv[0], name, strlen(name)); + strncpy(argv[0], "slurmd", 6); + strncpy((*argv)+7, arg, strlen(arg)); done: - xfree(name); + xfree(arg); return; } @@ -104,6 +110,10 @@ int mgr_launch_tasks(launch_tasks_request_msg_t *msg, slurm_addr *cli) { slurmd_job_t *job; + char buf[256]; + + snprintf(buf, sizeof(buf), "[%d.%d]", msg->job_id, msg->job_step_id); + log_set_fpfx(buf); /* New process, so we must reinit shm */ if (shm_init() < 0) @@ -122,7 +132,7 @@ mgr_launch_tasks(launch_tasks_request_msg_t *msg, slurm_addr *cli) if (_run_job(job) < 0) goto error; - job_debug2(job, "%ld returned from slurmd_run_job()", getpid()); + debug2("%ld returned from slurmd_run_job()", getpid()); shm_fini(); return(SLURM_SUCCESS); error: @@ -241,6 +251,8 @@ mgr_launch_batch_job(batch_job_launch_msg_t *msg, slurm_addr *cli) job_update_shm(job); + _setargs(job, *conf->argv, *conf->argc); + if ((batchdir = _make_batch_dir(job)) == NULL) goto cleanup2; @@ -292,8 +304,8 @@ _run_job(slurmd_job_t *job) job_update_shm(job); if (interconnect_init(job) == SLURM_ERROR) { - job_error(job, "interconnect_init: %m"); - rc = -2; + error("interconnect_init: %m"); + rc = errno; /* shm_init(); */ goto fail; } @@ -311,8 +323,10 @@ _run_job(slurmd_job_t *job) /* * Temporarily drop permissions */ - if ((rc = _drop_privileges(job->pwd)) < 0) + if ((rc = _drop_privileges(job->pwd)) < 0) { + rc = ESLURMD_SET_UID_OR_GID_ERROR; goto fail2; + } /* Open input/output files and/or connections back to client */ @@ -321,6 +335,7 @@ _run_job(slurmd_job_t *job) if (_reclaim_privileges(spwd) < 0) error("sete{u/g}id(%ld/%ld): %m", spwd->pw_uid, spwd->pw_gid); + if (rc < 0) { rc = ESLURMD_IO_ERROR; goto fail2; @@ -330,14 +345,14 @@ _run_job(slurmd_job_t *job) _send_launch_resp(job, rc); _wait_for_all_tasks(job); - job_debug2(job, "all tasks exited, waiting on IO"); + debug2("all tasks exited, waiting on IO"); io_close_all(job); pthread_join(job->ioid, NULL); - job_debug2(job, "IO complete"); + debug2("IO complete"); interconnect_fini(job); /* ignore errors */ job_delete_shm(job); /* again, ignore errors */ - job_verbose(job, "job completed", rc); + verbose("job completed, rc = %d", rc); return rc; fail2: @@ -438,7 +453,6 @@ _run_batch_job(slurmd_job_t *job) pid_t sid, pid; struct passwd *spwd = getpwuid(getuid()); - if ((rc = io_spawn_handler(job)) < 0) { return ESLURMD_IO_ERROR; } @@ -524,7 +538,7 @@ _wait_for_all_tasks(slurmd_job_t *job) if ((pid < (pid_t) 0)) { if (errno == EINTR) _handle_attach_req(job); - job_error(job, "waitpid: %m"); + error("waitpid: %m"); /* job_cleanup() */ } for (i = 0; i < job->ntasks; i++) { @@ -659,22 +673,24 @@ _exec_all_tasks(slurmd_job_t *job) pid_t sid; int i; - job_debug3(job, "%ld entered _launch_tasks", getpid()); + debug3("%ld entered _launch_tasks", getpid()); xsignal(SIGPIPE, SIG_IGN); if ((sid = setsid()) < (pid_t) 0) { - job_error(job, "setsid: %m"); + error("setsid: %m"); } + _block_most_signals(); + if (shm_update_step_sid(job->jobid, job->stepid, sid) < 0) - job_error(job, "shm_update_step_sid: %m"); + error("shm_update_step_sid: %m"); - job_debug2(job, "invoking %d tasks", job->ntasks); + debug2("invoking %d tasks", job->ntasks); for (i = 0; i < job->ntasks; i++) { task_t t; - job_debug2(job, "going to fork task %d", i); + debug2("going to fork task %d", i); t.id = i; t.global_id = job->task[i]->gid; t.ppid = getpid(); @@ -690,12 +706,12 @@ _exec_all_tasks(slurmd_job_t *job) job->task[i]->pid = t.pid; - job_debug2(job, "%ld: forked child process %ld for task %d", + debug2("%ld: forked child process %ld for task %d", getpid(), (long) t.pid, i); - job_debug2(job, "going to add task %d to shm", i); + debug2("going to add task %d to shm", i); if (shm_add_task(job->jobid, job->stepid, &t) < 0) - job_error(job, "shm_add_task: %m"); - job_debug2(job, "task %d added to shm", i); + error("shm_add_task: %m"); + debug2("task %d added to shm", i); } @@ -752,6 +768,27 @@ _unblock_all_signals(void) return SLURM_SUCCESS; } +static int +_block_most_signals(void) +{ + sigset_t set; + if (sigemptyset(&set) < 0) { + error("sigemptyset: %m"); + return SLURM_ERROR; + } + sigaddset(&set, SIGINT); + sigaddset(&set, SIGTERM); + sigaddset(&set, SIGSTOP); + sigaddset(&set, SIGTSTP); + sigaddset(&set, SIGQUIT); + if (sigprocmask(SIG_BLOCK, &set, NULL) < 0) { + error("sigprocmask: %m"); + return SLURM_ERROR; + } + + return SLURM_SUCCESS; +} + static void _send_launch_resp(slurmd_job_t *job, int rc) { @@ -760,7 +797,7 @@ _send_launch_resp(slurmd_job_t *job, int rc) launch_tasks_response_msg_t resp; srun_info_t *srun = list_peek(job->sruns); - job_debug(job, "Sending launch resp rc=%d", rc); + debug("Sending launch resp rc=%d", rc); resp_msg.address = srun->resp_addr; resp_msg.data = &resp; @@ -784,7 +821,14 @@ static void _slurmd_job_log_init(slurmd_job_t *job) { char argv0[64]; - log_options_t logopt = LOG_OPTS_STDERR_ONLY; + + conf->log_opts.buffered = 1; + + /* + * Reset stderr logging to user requested level + * (Logfile and syslog levels remain the same) + */ + conf->log_opts.stderr_level = LOG_LEVEL_ERROR + job->debug; /* Connect slurmd stderr to job's stderr */ if (dup2(job->task[0]->perr[1], STDERR_FILENO) < 0) { @@ -792,11 +836,13 @@ _slurmd_job_log_init(slurmd_job_t *job) return; } - logopt.stderr_level += job->debug; + fd_set_nonblocking(STDERR_FILENO); snprintf(argv0, sizeof(argv0), "slurmd[%s]", conf->hostname); - /* reinitialize log to log on stderr */ - log_init(argv0, logopt, 0, NULL); + /* + * reinitialize log + */ + log_init(argv0, conf->log_opts, 0, NULL); } diff --git a/src/slurmd/no_interconnect.c b/src/slurmd/no_interconnect.c index 0e445b50f13f6ccf65efbdf2bee7330b095b8580..0102a698bb61e1403505c6466c95acc9ef0a74a1 100644 --- a/src/slurmd/no_interconnect.c +++ b/src/slurmd/no_interconnect.c @@ -28,6 +28,8 @@ #include <src/slurmd/interconnect.h> #include <src/slurmd/setenvpf.h> +#include "src/slurmd/shm.h" + int interconnect_init (slurmd_job_t *job) { return SLURM_SUCCESS; diff --git a/src/slurmd/req.c b/src/slurmd/req.c index 572673d1b960b00803dbbc18684de7e62182055b..4c9f821ee684c7c219775c39948297e7a35b3900 100644 --- a/src/slurmd/req.c +++ b/src/slurmd/req.c @@ -368,8 +368,11 @@ _kill_all_active_steps(uint32_t jobid) while ((s = list_next(i))) { if (s->jobid == jobid) { - shm_signal_step(jobid, s->stepid, SIGKILL); - shm_delete_step(jobid, s->stepid); + /* Kill entire process group + * (slurmd manager will clean up any stragglers) + */ + debug2("sending SIGKILL to process group %d", s->sid); + killpg(s->sid, SIGKILL); } } list_iterator_destroy(i); diff --git a/src/srun/fname.c b/src/srun/fname.c index 9b27f49b47fe45d4c17ed37e5f4fb83703becb68..c047d79c83294a3dc70767345b34abb5f107f884 100644 --- a/src/srun/fname.c +++ b/src/srun/fname.c @@ -80,7 +80,7 @@ fname_create(job_t *job, char *format) fname->type = IO_PER_TASK; if (wid) - xstrfmtcat(name, "%%"); + xstrcatchar(name, '%'); p++; break; diff --git a/src/srun/io.c b/src/srun/io.c index 956c911aad30824639f856bc965536e9d4eda975..85111d6c194580506ca9bc4fc9593a99877d6d89 100644 --- a/src/srun/io.c +++ b/src/srun/io.c @@ -167,14 +167,13 @@ _do_output(cbuf_t buf, FILE *out, int tasknum) fflush(out); } - if ((len = cbuf_used(buf))) - error ("Unable to print %d bytes output data", len); } static void _flush_io(job_t *job) { int i; + int len; debug3("flushing all io"); for (i = 0; i < opt.nprocs; i++) { @@ -186,6 +185,11 @@ _flush_io(job_t *job) _do_output(job->errbuf[i], job->errstream, i); if (job->err[i] != IO_DONE) _close_stream(&job->err[i], stderr, i); + + if ((len = cbuf_used(job->outbuf[i]))) + error ("Unable to print %d bytes output data", len); + if ((len = cbuf_used(job->outbuf[i]))) + error ("Unable to print %d bytes output data", len); } } @@ -216,11 +220,11 @@ _io_thr_poll(void *job_arg) else out_fd_state = WAITING_FOR_IO; - if (!job->efname->name) + if (!opt.efname) err_fd_state = IO_DONE; } - if (job->efname->name == IO_ALL && opt.efname) { + if ((job->efname->type == IO_ALL) && (err_fd_state != IO_DONE)) { err_fd_state = WAITING_FOR_IO; } else err_fd_state = IO_DONE; @@ -237,13 +241,13 @@ _io_thr_poll(void *job_arg) int eofcnt = 0; nfds = job->niofds; /* already have n ioport fds + stdin */ - if (job->stdinfd >= 0) { + if ((job->stdinfd >= 0) && !stdin_got_eof) { _poll_set_rd(fds[nfds], job->stdinfd); nfds++; } for (i = 0; i < opt.nprocs; i++) { - if (job->out[i] > 0) { + if (job->out[i] >= 0) { _poll_set_rd(fds[nfds], job->out[i]); if ( (cbuf_used(job->inbuf[i]) > 0) @@ -257,7 +261,7 @@ _io_thr_poll(void *job_arg) nfds++; } - if (job->err[i] > 0) { + if (job->err[i] >= 0) { _poll_set_rd(fds[nfds], job->err[i]); map[nfds].taskid = i; map[nfds].fd = &job->err[i]; @@ -317,10 +321,8 @@ _io_thr_poll(void *job_arg) } } - if ((job->stdinfd >= 0)) { - if (_poll_rd_isset(fds[i])) - _bcast_stdin(job->stdinfd, job); - if (_poll_hup(fds[i]) || _poll_err(fds[i])) + if ((job->stdinfd >= 0) && !stdin_got_eof) { + if (fds[i].revents) _bcast_stdin(job->stdinfd, job); ++i; } @@ -332,10 +334,10 @@ _io_thr_poll(void *job_arg) if ((revents & POLLERR) || (revents & POLLHUP)) _handle_pollerr(&map[i]); - if ((revents & POLLIN) && (*map[i].fd > 0)) + if ((revents & POLLIN) && (*map[i].fd >= 0)) _do_task_output_poll(&map[i]); - if ((revents & POLLOUT) && (*map[i].fd > 0)) + if ((revents & POLLOUT) && (*map[i].fd >= 0)) _do_task_input_poll(job, &map[i]); } } @@ -421,7 +423,7 @@ void report_task_status(job_t *job) for (i = 0; i < opt.nprocs; i++) { int state = job->task_state[i]; if ((state == SRUN_TASK_EXITED) - && ((job->err[i] > 0) || (job->out[i] > 0))) + && ((job->err[i] >= 0) || (job->out[i] >= 0))) state = 4; snprintf(buf, 256, "task%d", i); hostlist_push(hl[state], buf); @@ -614,9 +616,16 @@ _accept_io_stream(job_t *job, int i) continue; } - slurm_mutex_lock(&job->task_mutex); - job->task_state[hdr.task_id] = SRUN_TASK_RUNNING; - slurm_mutex_unlock(&job->task_mutex); + /* + * IO connection from task may come after task exits, + * in which case, state should be waiting for IO. + * + * Update to RUNNING now handled in msg.c + * + * slurm_mutex_lock(&job->task_mutex); + * job->task_state[hdr.task_id] = SRUN_TASK_RUNNING; + * slurm_mutex_unlock(&job->task_mutex); + */ fd_set_nonblocking(sd); if (hdr.type == SLURM_IO_STREAM_INOUT) @@ -653,6 +662,8 @@ _do_task_output(int *fd, FILE *out, cbuf_t buf, int tasknum) int dropped = 0; if ((len = cbuf_write_from_fd(buf, *fd, -1, &dropped)) <= 0) { + if ((len != 0)) + error("Error task %d IO: %m", tasknum); _close_stream(fd, out, tasknum); return len; } @@ -743,7 +754,7 @@ _write_all(job_t *job, cbuf_t cb, char *buf, size_t len, int taskid) again: n = cbuf_write(cb, buf, len, &dropped); - if ((n < len) && (job->out[taskid] > 0)) { + if ((n < len) && (job->out[taskid] >= 0)) { error("cbuf_write returned %d", n); _do_task_input(job, taskid); goto again; @@ -777,7 +788,7 @@ _bcast_stdin(int fd, job_t *job) if ((n = _readx(fd, buf, len)) <= 0) { if (n == 0) { /* got EOF */ - close(job->stdinfd); + close(job->stdinfd); job->stdinfd = IO_DONE; stdin_got_eof = true; return; diff --git a/src/srun/job.c b/src/srun/job.c index ae41837f7495bbdb2270dc68ff7a61b9962e1d83..9a4861c964b42d7bff0eabd0878ce65df5644262 100644 --- a/src/srun/job.c +++ b/src/srun/job.c @@ -31,6 +31,7 @@ #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> +#include <signal.h> #include "src/common/hostlist.h" #include "src/common/log.h" @@ -259,3 +260,11 @@ update_job_state(job_t *job, job_state_t state) pthread_mutex_unlock(&job->state_mutex); } +void +job_force_termination(job_t *job) +{ + info ("forcing job termination"); + update_job_state(job, SRUN_JOB_OVERDONE); + pthread_kill(job->ioid, SIGTERM); +} + diff --git a/src/srun/job.h b/src/srun/job.h index 8886d14f8a6d1acdeb5be16a5b1df5e66bb10afb..6b37374146949b52ab53ad2b18d0299d31406b8b 100644 --- a/src/srun/job.h +++ b/src/srun/job.h @@ -106,6 +106,7 @@ typedef struct srun_job { } job_t; void update_job_state(job_t *job, job_state_t newstate); +void job_force_termination(job_t *job); job_t * job_create_noalloc(void); job_t * job_create_allocation(resource_allocation_response_msg_t *resp); diff --git a/src/srun/msg.c b/src/srun/msg.c index 5989684d63da7e40a5be1e4e7db6e4c80f175744..7505ae595f7037ae69b1a8ecb3d5926d40565447 100644 --- a/src/srun/msg.c +++ b/src/srun/msg.c @@ -204,6 +204,18 @@ _process_launch_resp(job_t *job, launch_tasks_response_msg_t *msg) } } +static void +update_running_tasks(job_t *job, uint32_t nodeid) +{ + int i; + slurm_mutex_lock(&job->task_mutex); + for (i = 0; i < job->ntask[nodeid]; i++) { + uint32_t tid = job->tids[nodeid][i]; + job->task_state[tid] = SRUN_TASK_RUNNING; + } + slurm_mutex_unlock(&job->task_mutex); +} + static void update_failed_tasks(job_t *job, uint32_t nodeid) { @@ -252,8 +264,10 @@ _launch_handler(job_t *job, slurm_msg_t *resp) tv_launch_failure(); #endif return; - } else + } else { _process_launch_resp(job, msg); + update_running_tasks(job, msg->srun_node_id); + } } /* _confirm_launch_complete diff --git a/src/srun/opt.c b/src/srun/opt.c index 50053fd1c66a104303b86939d2405f0ea5c9d9b0..db75370124293270d44f588fd3adc398235bc3c1 100644 --- a/src/srun/opt.c +++ b/src/srun/opt.c @@ -179,7 +179,7 @@ struct poptOption runTable[] = { {"time", 't', POPT_ARG_INT, &opt.time_limit, OPT_TIME, "time limit", "minutes"}, - {"cddir", 'D', POPT_ARG_STRING, NULL, OPT_CDDIR, + {"chdir", 'D', POPT_ARG_STRING, NULL, OPT_CDDIR, "change current working directory of remote procs", "path"}, {"immediate", 'I', POPT_ARG_NONE, &opt.immediate, 0, @@ -664,7 +664,8 @@ static void _opt_args(int ac, char **av) case OPT_ATTACH: if (opt.allocate || opt.batch) { - error("can only specify one mode: allocate, attach or batch."); + error("can only specify one mode: " + "allocate, attach or batch."); exit(1); } mode = MODE_ATTACH; @@ -673,7 +674,8 @@ static void _opt_args(int ac, char **av) case OPT_ALLOCATE: if (opt.attach || opt.batch) { - error("can only specify one mode: allocate, attach or batch."); + error("can only specify one mode: " + "allocate, attach or batch."); exit(1); } mode = MODE_ALLOCATE; diff --git a/src/srun/srun.c b/src/srun/srun.c index d69ac3e9989f4a1be070a1bc5d643876aa8b6c37..fd8a25855923480b624122c074dca0fb9bdfee76 100644 --- a/src/srun/srun.c +++ b/src/srun/srun.c @@ -124,11 +124,6 @@ static int _set_batch_script_env(uint32_t jobid); static void _qsw_standalone(job_t *job); #endif -/* empty sigint handler */ -static void -_int_handler(int signal) -{ pthread_cancel(pthread_self());} - int srun(int ac, char **av) { @@ -214,6 +209,7 @@ srun(int ac, char **av) /* block most signals in all threads, except sigterm */ sigemptyset(&sigset); sigaddset(&sigset, SIGINT); + sigaddset(&sigset, SIGQUIT); sigaddset(&sigset, SIGTSTP); sigaddset(&sigset, SIGSTOP); if (sigprocmask(SIG_BLOCK, &sigset, NULL) != 0) @@ -265,7 +261,6 @@ srun(int ac, char **av) pthread_kill(job->jtid, SIGTERM); /* wait for stdio */ - xsignal(SIGQUIT, &_int_handler); if (pthread_join(job->ioid, NULL) < 0) { error ("Waiting on IO: %m"); } @@ -281,6 +276,8 @@ srun(int ac, char **av) slurm_complete_job(job->jobid, 0, 0); } + log_fini(); + exit(0); } @@ -329,20 +326,19 @@ _allocate_nodes(void) job.shared = 1; retries = 0; - while ((rc = slurm_allocate_resources(&job, &resp)) - == SLURM_FAILURE) { + while ((rc = slurm_allocate_resources(&job, &resp)) < 0) { if ((slurm_get_errno() == ESLURM_ERROR_ON_DESC_TO_RECORD_COPY) && (retries < MAX_RETRIES)) { + char *msg = "Slurm controller not responding, " + "sleeping and retrying"; if (retries == 0) - error ("Slurm controller not responding, sleeping and retrying"); + error (msg); else - debug ("Slurm controller not responding, sleeping and retrying"); + debug (msg); sleep (++retries); - } - else { - error("Unable to allocate resources: %s", - slurm_strerror(errno)); + } else { + error("Unable to allocate resources: %m"); return NULL; } } @@ -359,15 +355,16 @@ _allocate_nodes(void) old_job.uid = (uint32_t) getuid(); slurm_free_resource_allocation_response_msg (resp); sleep (2); + /* Keep polling until the job is allocated resources */ - while (slurm_confirm_allocation(&old_job, &resp) == - SLURM_FAILURE) { + while (slurm_confirm_allocation(&old_job, &resp) < 0) { if (slurm_get_errno() == ESLURM_JOB_PENDING) { debug3("Still waiting for allocation"); sleep (10); } else { - error("Unable to confirm resource allocation for job %u: %s", - old_job.job_id, slurm_strerror(errno)); + error("Unable to confirm resource " + "allocation for job %u: %m", + old_job.job_id); exit (1); } } @@ -497,6 +494,47 @@ _sigterm_handler(int signum) } } +static void +_handle_intr(job_t *job, time_t *last_intr, time_t *last_intr_sent) +{ + + if ((time(NULL) - *last_intr) > 1) { + info("interrupt (one more within 1 sec to abort)"); + report_task_status(job); + *last_intr = time(NULL); + } else { /* second Ctrl-C in half as many seconds */ + + /* terminate job */ + if (job->state != SRUN_JOB_OVERDONE) { + + info("sending Ctrl-C to job"); + *last_intr = time(NULL); + _fwd_signal(job, SIGINT); + + if ((time(NULL) - *last_intr_sent) < 1) + job_force_termination(job); + else + *last_intr_sent = time(NULL); + } else { + job_force_termination(job); + } + } +} + +static void +_sig_thr_setup(sigset_t *set) +{ + int rc; + + sigemptyset(set); + sigaddset(set, SIGINT); + sigaddset(set, SIGQUIT); + sigaddset(set, SIGTSTP); + sigaddset(set, SIGSTOP); + if ((rc = pthread_sigmask(SIG_BLOCK, set, NULL)) != 0) + error ("pthread_sigmask: %s", slurm_strerror(rc)); +} + /* simple signal handling thread */ static void * @@ -509,47 +547,24 @@ _sig_thr(void *arg) int signo; while (1) { - sigfillset(&set); - sigdelset(&set, SIGABRT); - sigdelset(&set, SIGSEGV); - sigdelset(&set, SIGQUIT); - sigdelset(&set, SIGUSR1); - sigdelset(&set, SIGUSR2); - pthread_sigmask(SIG_BLOCK, &set, NULL); + + _sig_thr_setup(&set); + sigwait(&set, &signo); debug2("recvd signal %d", signo); switch (signo) { case SIGINT: - if ((time(NULL) - last_intr) > 1) { - info("interrupt (one more within 1 " - "sec to abort)"); - report_task_status(job); - last_intr = time(NULL); - } else { /* second Ctrl-C in half as many seconds */ - /* terminate job */ - if (job->state != SRUN_JOB_OVERDONE) { - info("sending Ctrl-C to job"); - last_intr = time(NULL); - _fwd_signal(job, signo); - if ((time(NULL) - last_intr_sent) < 1) { - info("forcing termination"); - update_job_state(job, SRUN_JOB_OVERDONE); - pthread_kill(job->ioid, SIGTERM); - } - last_intr_sent = time(NULL); - } else { - info("forcing termination"); - pthread_kill(job->ioid, SIGTERM); - } - } + _handle_intr(job, &last_intr, &last_intr_sent); + break; + case SIGSTOP: + case SIGTSTP: + debug3("Ignoring SIGSTOP"); + break; + case SIGQUIT: + info("Quit"); + job_force_termination(job); break; default: - if (job->state != SRUN_JOB_OVERDONE) - _fwd_signal(job, signo); - else if (signo == SIGQUIT) { - info("forcing termination"); - pthread_kill(job->ioid, SIGTERM); - } break; } }