From 9b479e54aec723aa689a8f560b0e8bd0be24e042 Mon Sep 17 00:00:00 2001 From: Mark Grondona <mgrondona@llnl.gov> Date: Thu, 12 Dec 2002 01:01:37 +0000 Subject: [PATCH] o Major changes to support all methods of I/O handling. (i.e. support documented uses of --input,output,error) o Allow premature termination of srun (i.e. when IO is not yet complete) by insistent user. o added xstrfmtcat() and xmemcat() to xstring.c --- doc/man/man1/srun.1 | 112 +++++++++---- src/common/list.c | 2 +- src/common/log.c | 6 +- src/common/qsw.c | 2 + src/common/xstring.c | 58 ++++++- src/common/xstring.h | 14 ++ src/slurmd/Makefile.am | 1 + src/slurmd/fname.c | 107 +++++++++++++ src/slurmd/fname.h | 34 ++++ src/slurmd/io.c | 355 ++++++++++++++++++++++++++--------------- src/slurmd/job.c | 99 +++++------- src/slurmd/job.h | 11 +- src/slurmd/mgr.c | 1 + src/srun/Makefile.am | 2 +- src/srun/fname.c | 133 +++++++++++++++ src/srun/fname.h | 56 +++++++ src/srun/io.c | 211 +++++++++++++++++++----- src/srun/job.c | 206 +++++++++++++++++++++++- src/srun/job.h | 15 +- src/srun/launch.c | 22 ++- src/srun/msg.c | 23 ++- src/srun/opt.c | 119 +------------- src/srun/opt.h | 21 +-- src/srun/reattach.c | 43 +++-- src/srun/srun.c | 47 +++--- 25 files changed, 1231 insertions(+), 469 deletions(-) create mode 100644 src/slurmd/fname.c create mode 100644 src/slurmd/fname.h create mode 100644 src/srun/fname.c create mode 100644 src/srun/fname.h diff --git a/doc/man/man1/srun.1 b/doc/man/man1/srun.1 index 987ff097024..44c162ad083 100644 --- a/doc/man/man1/srun.1 +++ b/doc/man/man1/srun.1 @@ -76,7 +76,7 @@ overcommit resources. Normally, .B srun will not allocate more than one process per cpu. By specifying \fB\-\-overcommit\fR you are explicitly allowing more than one process -per cpu. However no more than \fMAX_TASKS_PER_NODE\fR tasks are +per cpu. However no more than \fBMAX_TASKS_PER_NODE\fR tasks are permitted to execute per node. .TP \fB\-T\fR, \fB\-\-threads\fR=\fInthreads\fR @@ -113,21 +113,21 @@ Specify a name for the job. The specified name will appear along with the job id number when querying running jobs on the system. The default is the supplied \fBexecutable\fR program's name. .TP -\fB\-o\fR, \fB\-\-output\fR=\fIout\fR -Specify how stdout is to be directed. By default, +\fB\-o\fR, \fB\-\-output\fR=\fImode\fR +Specify the mode for stdout redirection. By default, .B srun collects stdout from all tasks and line buffers this output to the attached terminal. With \fB\-\-output\fR stdout may be redirected -to a file, to one file per task, or to /dev/null. See \fBIO Redirection\fR -below. +to a file, to one file per task, or to /dev/null. See section +\fBIO Redirection\fR below for the various forms of \fImode\fR. .TP -\fB\-i\fR, \fB\-\-input\fR=\fIin\fR +\fB\-i\fR, \fB\-\-input\fR=\fImode\fR Specify how stdin is to redirected. By default, .B srun -redirects stdin to all tasks from /dev/null. See \fBIO Redirection\fR +redirects stdin from the terminal all tasks. See \fBIO Redirection\fR below for more options. .TP -\fB\-e\fR, \fB\-\-error\fR=\fIerr\fR +\fB\-e\fR, \fB\-\-error\fR=\fImode\fR Specify how stderr is to be redirected. By default, .B srun redirects stderr to the same file as stdout, if one is specified. The @@ -159,10 +159,6 @@ Specify how long to wait after the first task terminates before terminating all remaining tasks. The default value is unlimited. This can be useful to insure that a a job is terminated in a timely fashion in the event that one or more tasks terminate prematurely. -.TP -\fB\-d\fR, \fB\-\-debug\fR -enable debug output. Multiple \fB-d\fR's increase the debug level of -.B srun .PP Allocate options: .TP @@ -291,31 +287,83 @@ will consider this an error, as 15 processes cannot run across 16 nodes. By default stdout and stderr will be redirected from all tasks to the stdout and stderr of .B srun -, and stdin will be redirected from /dev/null to all tasks. This -behavior may be changed with the \fB\-\-output\fR, \fB\-\-error\fR, -and \fB\-\-input\fR (\fB\-o\fR, \fB\-e\fR, \fB\-i\fR) options. Valid -arguments to these options are +, and stdin will be redirected from the standard input of +.B srun +to all remote tasks. This behavior may be changed with the +\fB\-\-output\fR, \fB\-\-error\fR, and \fB\-\-input\fR +(\fB\-o\fR, \fB\-e\fR, \fB\-i\fR) options. Valid format specifications +for these options are .TP 10 -all -stdout stderr is redirected from all tasks to srun (This is the default). -stdin is forwarded to all tasks. -.TP -none -stdout and stderr are redirected to /dev/null. -stdin is redirected from /dev/null (This is the default for stdin) +\fBall\fR +stdout stderr is redirected from all tasks to srun. +stdin is broadcast to all remote tasks. +(This is the default behavior) +.TP +\fBnone\fR +stdout and stderr is not recieved from any task. +stdin is not sent to any task (stdin is closed). +.TP +\fItaskid\fR +stdout and/or stderr are redirected from only the task with relative +id equal to \fItaskid\fR, where 0 <= \fItaskid\fR <= \fIntasks\fR, +where \fIntasks\fR is the total number of tasks in the current job step. +stdin is redirected from the stdin of +.B srun +to this same task. .TP filename -stdout and stderr are redirected to the named file (relative to the -current working directory of the job). stdin is redirected from the -named file. +.B srun +will redirect stdout and/or stderr to the named file from all tasks. +stdin will be redirected from the named file and broadcast to all +tasks in the job. .TP format string -If a format string is provided (such as "output.%d"), -.B srun -will open one file per task passing the task id as the argument to -the format string. The format specifier may be any valid printf -format, as long as it takes a numeric argument. +.B srun +allows for a format string to be used to generate the named IO file +described above. The following list of format specifiers may be +used in the format string to generate a filename that will be +unique to a given jobid, stepid, node, or task. In each case, +the appropiate number of files are opened and associated with +the corresponding tasks. +.RS 10 +.TP +%J +jobid.stepid of the running job. (e.g. "128.0") +.TP +%j +jobid of the running job. +.TP +%s +stepid of the running job. +.TP +%N +short hostname. This will create a separate IO file per node. +.TP +%n +Node identifier relative to current job (e.g. "0" is the first node of +the running job) This will create a separate IO file per node. +.TP +%t +task identifier (rank) relative to current job. This will create a +separate IO file per task. +.PP +A number placed between the percent character and format specifier may be +used to zero-pad the result in the IO filename. This number is ignored if +the format specifier corresponds to non-numeric data (%N for example). + +Some examples of how the format string may be used for a 4 task job step +with a Job ID of 128 and step id of 0 are included below: +.TP 15 +job%J.out +job128.0.out +.TP +job%4j.out +job0128.out +.TP +job%j-%2t.out +job128-00.out, job128-01.out, ... .PP +.RS -10 .PP .B "Allocate Mode" .PP @@ -363,7 +411,7 @@ SLURM_CPUS_PER_TASK \fB\-c, \-\-ncpus\-per\-task\fR=\fIn\fR .TP SLURM_DEBUG -\fB\-d, \-\-debug\fR +\fB\-v, \-\-verbose\fR .TP SLURM_DISTRIBUTION \fB\-m, \-\-distribution\fR=(\fIblock|cyclic\fR) diff --git a/src/common/list.c b/src/common/list.c index 77fe0ef06c7..efe4bff56f9 100644 --- a/src/common/list.c +++ b/src/common/list.c @@ -783,7 +783,7 @@ list_alloc_aux (int size, void *pfreelist) if (!*pfree) { if ((*pfree = malloc(LIST_ALLOC * size))) { px = *pfree; - plast = *pfree + ((LIST_ALLOC - 1) * size); + plast = (void **) ((char *) *pfree + ((LIST_ALLOC - 1) * size)); while (px < plast) *px = (char *) px + size, px = *px; *plast = NULL; diff --git a/src/common/log.c b/src/common/log.c index 9b2f9c8b20a..4eefdfe6ace 100644 --- a/src/common/log.c +++ b/src/common/log.c @@ -322,7 +322,7 @@ static char *vxstrfmt(const char *fmt, va_list ap) /* * concatenate result of xstrfmt() to dst, expanding dst if necessary */ -static void xstrfmtcat(char **dst, const char *fmt, ...) +static void xlogfmtcat(char **dst, const char *fmt, ...) { va_list ap; char *buf = NULL; @@ -415,7 +415,7 @@ static void log_msg(log_level_t level, const char *fmt, va_list args) } if (level <= log->opt.logfile_level && log->logfp != NULL) { - xstrfmtcat(&msgbuf, "[%M] %s%s", pfx, buf); + xlogfmtcat(&msgbuf, "[%M] %s%s", pfx, buf); if (strlen(buf) > 0 && buf[strlen(buf) - 1] == '\n') fprintf(log->logfp, "%s", msgbuf); @@ -427,7 +427,7 @@ static void log_msg(log_level_t level, const char *fmt, va_list args) } if (level <= log->opt.syslog_level) { - xstrfmtcat(&msgbuf, "%s%s", pfx, buf); + xlogfmtcat(&msgbuf, "%s%s", pfx, buf); openlog(log->argv0, LOG_PID, log->facility); syslog(priority, "%.500s", msgbuf); diff --git a/src/common/qsw.c b/src/common/qsw.c index 038dc807bf1..bf55c37d06d 100644 --- a/src/common/qsw.c +++ b/src/common/qsw.c @@ -280,6 +280,8 @@ qsw_alloc_jobinfo(qsw_jobinfo_t *jp) void qsw_free_jobinfo(qsw_jobinfo_t j) { + if (j == NULL) + return; assert(j->j_magic == QSW_JOBINFO_MAGIC); assert(j->j_ctx == NULL); j->j_magic = 0; diff --git a/src/common/xstring.c b/src/common/xstring.c index 3c7d3be7167..22c52fbb922 100644 --- a/src/common/xstring.c +++ b/src/common/xstring.c @@ -4,8 +4,9 @@ ****************************************************************************** * Copyright (C) 2002 The Regents of the University of California. * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). - * Written by Mark Grondona <grondona@llnl.gov>, - * Jim Garlick <garlick@llnl.gov>, et. al. + * Written by Jim Garlick <garlick@llnl.gov> + * Mark Grondona <grondona@llnl.gov>, et al. + * * UCRL-CODE-2002-040. * * This file is part of SLURM, a resource management program. @@ -33,7 +34,7 @@ #include <string.h> #include <stdio.h> #if HAVE_STRERROR_R && !HAVE_DECL_STRERROR_R -//char *strerror_r(int, char *, int); +char *strerror_r(int, char *, int); #endif #include <errno.h> #if HAVE_UNISTD_H @@ -44,11 +45,12 @@ # include <pthread.h> #endif -#include <xmalloc.h> -#include <xstring.h> -#include <strlcpy.h> -#include <xassert.h> +#include <stdarg.h> +#include "src/common/xmalloc.h" +#include "src/common/xstring.h" +#include "src/common/strlcpy.h" +#include "src/common/xassert.h" #include "src/common/slurm_errno.h" #define XFGETS_CHUNKSIZE 64 @@ -158,6 +160,48 @@ void _xstrftimecat(char **buf, const char *fmt) _xstrcat(buf, p); } +/* + * append formatted string with printf-style args to buf, expanding + * buf as needed + */ +int _xstrfmtcat(char **str, const char *fmt, ...) +{ + va_list ap; + int rc; + char buf[4096]; + + va_start(ap, fmt); + rc = vsnprintf(buf, 4096, fmt, ap); + va_end(ap); + + xstrcat(*str, buf); + + return rc; +} + +/* + * append a range of memory from start to end to the string str, + * expanding str as needed + */ +void _xmemcat(char **str, char *start, char *end) +{ + char buf[4096]; + size_t len; + + xassert(end >= start); + + len = (size_t) end - (size_t) start; + + if (len == 0) + return; + + if (len > 4095) + len = 4095; + + memcpy(buf, start, len); + buf[len] = '\0'; + xstrcat(*str, buf); +} /* * Replacement for libc basename diff --git a/src/common/xstring.h b/src/common/xstring.h index e1679b4436e..05efbca481e 100644 --- a/src/common/xstring.h +++ b/src/common/xstring.h @@ -34,6 +34,8 @@ #define xstrcatchar(__p, __c) _xstrcatchar(&(__p), __c) #define xslurm_strerrorcat(__p) _xslurm_strerrorcat(&(__p)) #define xstrftimecat(__p, __fmt) _xstrftimecat(&(__p), __fmt) +#define xstrfmtcat(__p, __fmt, args...) _xstrfmtcat(&(__p), __fmt, ## args) +#define xmemcat(__p, __s, __e) _xmemcat(&(__p), __s, __e) /* ** The following functions take a ptr to a string and expand the @@ -67,6 +69,18 @@ void _xslurm_strerrorcat(char **str); */ void _xstrftimecat(char **str, const char *fmt); +/* +** concatenate printf-style formatted string onto str +** return value is result from vsnprintf(3) +*/ +int _xstrfmtcat(char **str, const char *fmt, ...); + +/* +** concatenate range of memory from start to end (not including end) +** onto str. +*/ +void _xmemcat(char **str, char *start, char *end); + /* ** strdup which uses xmalloc routines */ diff --git a/src/slurmd/Makefile.am b/src/slurmd/Makefile.am index c22b236c759..c1965aba844 100644 --- a/src/slurmd/Makefile.am +++ b/src/slurmd/Makefile.am @@ -36,6 +36,7 @@ common_sources = \ io.c io.h \ semaphore.c semaphore.h \ shm.c shm.h \ + fname.c fname.h \ setenvpf.c setenvpf.h \ interconnect.h diff --git a/src/slurmd/fname.c b/src/slurmd/fname.c new file mode 100644 index 00000000000..161c2a92cdb --- /dev/null +++ b/src/slurmd/fname.c @@ -0,0 +1,107 @@ +/*****************************************************************************\ + * src/slurmd/fname.c - IO filename creation routine (slurmd specific) + ***************************************************************************** + * Copyright (C) 2002 The Regents of the University of California. + * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). + * Written by Mark Grondona <mgrondona@llnl.gov>. + * UCRL-CODE-2002-040. + * + * This file is part of SLURM, a resource management program. + * For details, see <http://www.llnl.gov/linux/slurm/>. + * + * SLURM is free software; you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the License, or (at your option) + * any later version. + * + * SLURM is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along + * with SLURM; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. +\*****************************************************************************/ +#include <stdio.h> +#include <stdarg.h> +#include <stdlib.h> +#include <string.h> +#include <ctype.h> + +#include "src/slurmd/job.h" +#include "src/slurmd/fname.h" +#include "src/slurmd/slurmd.h" + +#include "src/common/xmalloc.h" +#include "src/common/xstring.h" +#include "src/common/xassert.h" + +/* + * Max zero-padding width + */ +#define MAX_WIDTH 10 + +/* Create an IO filename from job parameters and the filename format + * sent from client + */ +char * +fname_create(slurmd_job_t *job, const char *format, int taskid) +{ + unsigned long int wid = 0; + char *name = NULL; + char *p, *q; + + q = p = format; + while(*p != '\0') { + if (*p == '%') { + if (isdigit(*(++p))) { + xmemcat(name, q, p - 1); + if ((wid = strtoul(p, &p, 10)) > MAX_WIDTH) + wid = MAX_WIDTH; + q = p - 1; + if (*p == '\0') + break; + } + + switch (*p) { + case 't': /* '%t' => taskid */ + xmemcat(name, q, p - 1); + xstrfmtcat(name, "%0*d", wid, taskid); + q = ++p; + break; + case 'n': /* '%n' => nodeid */ + xmemcat(name, q, p - 1); + xstrfmtcat(name, "%0*d", wid, job->nodeid); + q = ++p; + break; + case 'N': /* '%N' => node name */ + xmemcat(name, q, p - 1); + xstrfmtcat(name, "%s", conf->hostname); + q = ++p; + break; + case 'J': + case 'j': + xmemcat(name, q, p - 1); + xstrfmtcat(name, "%0*d", wid, job->jobid); + + if ((*p == 'J') && (job->stepid != NO_VAL)) + xstrfmtcat(name, ".%d", job->stepid); + q = ++p; + break; + + default: + break; + } + wid = 0; + + } else + p++; + } + + if (q != p) + xmemcat(name, q, p); + + return name; +} + diff --git a/src/slurmd/fname.h b/src/slurmd/fname.h new file mode 100644 index 00000000000..65148796af5 --- /dev/null +++ b/src/slurmd/fname.h @@ -0,0 +1,34 @@ +/*****************************************************************************\ + * src/slurmd/fname.h - IO filename creation routine (slurmd specific) + ***************************************************************************** + * Copyright (C) 2002 The Regents of the University of California. + * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). + * Written by Mark Grondona <mgrondona@llnl.gov>. + * UCRL-CODE-2002-040. + * + * This file is part of SLURM, a resource management program. + * For details, see <http://www.llnl.gov/linux/slurm/>. + * + * SLURM is free software; you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the License, or (at your option) + * any later version. + * + * SLURM is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along + * with SLURM; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. +\*****************************************************************************/ + +#ifndef _SLURMD_FNAME_H +#define _SLURMD_FNAME_H + +#include "src/slurmd/job.h" + +char *fname_create(slurmd_job_t *job, const char *fmt, int taskid); + +#endif /* !_SLURMD_FNAME_H */ diff --git a/src/slurmd/io.c b/src/slurmd/io.c index 43ba1746d73..4f02969a1f5 100644 --- a/src/slurmd/io.c +++ b/src/slurmd/io.c @@ -58,6 +58,7 @@ #include "src/slurmd/job.h" #include "src/slurmd/shm.h" #include "src/slurmd/io.h" +#include "src/slurmd/fname.h" typedef enum slurmd_io_tupe { TASK_STDERR = 0, @@ -103,21 +104,28 @@ struct io_info { }; +static bool _isa_client(struct io_info *io); +static bool _isa_task(struct io_info *io); + static int _io_init_pipes(task_info_t *t); static int _io_prepare_clients(slurmd_job_t *); static int _io_prepare_tasks(slurmd_job_t *); -static int _io_prepare_files(slurmd_job_t *); static void * _io_thr(void *); static int _io_write_header(struct io_info *, srun_info_t *); static void _io_connect_objs(io_obj_t *, io_obj_t *); static int _validate_io_list(List objList); static int _shutdown_task_obj(struct io_info *t); static int find_obj(void *obj, void *key); -static int find_fd(void *obj, void *key); +/* static int find_fd(void *obj, void *key); */ static bool _isa_client(struct io_info *io); static bool _isa_task(struct io_info *io); +static void _obj_close(io_obj_t *obj, List objs); static void _io_client_attach(io_obj_t *, io_obj_t *, io_obj_t *, List objList); +static int _open_output_file(slurmd_job_t *job, task_info_t *t, + char *fname, slurmd_io_type_t type); +static int _open_stdin_file(slurmd_job_t *job, task_info_t *t, + srun_info_t *srun); static struct io_obj * _io_obj_create(int fd, void *arg); static struct io_info * _io_info_create(uint32_t id); @@ -142,7 +150,6 @@ static int _client_read(io_obj_t *, List); static int _task_error(io_obj_t *, List); static int _client_error(io_obj_t *, List); static int _connecting_write(io_obj_t *, List); -static int _file_write(io_obj_t *, List); /* Task Output operations (TASK_STDOUT, TASK_STDERR) * These objects are never writable -- @@ -227,12 +234,9 @@ io_spawn_handler(slurmd_job_t *job) /* open 2*ntask initial connections or files for stdout/err * append these to objs list */ - if ((list_count(job->sruns) > 0) && (_io_prepare_clients(job) < 0)) + if (_io_prepare_clients(job) < 0) return SLURM_FAILURE; - if (_io_prepare_files(job) < 0) - slurm_seterrno_ret(ESCRIPT_OPEN_OUTPUT_FAILED); - return 0; } @@ -246,40 +250,82 @@ _xclose(int fd) return rc; } -/* Close child fds in parent */ + + +/* + * Close child fds in parent as well as + * any stdin io objs in job->objs + * + */ static void _io_finalize(task_info_t *t) { + struct io_info *in = t->in->arg; + ListIterator i; + struct io_info *io; + + if (_xclose(t->pin[0] ) < 0) error("close(stdin) : %m"); if (_xclose(t->pout[1]) < 0) error("close(stdout): %m"); if (_xclose(t->perr[1]) < 0) error("close(stderr): %m"); + + in->disconnected = 1; + /* close stdin objs + */ + if (!in->writers) + return; + + i = list_iterator_create(in->writers); + while ((io = list_next(i))) { + if (io->obj->fd > 0) { + io->eof = 1; + } + } + list_iterator_destroy(i); } void io_close_all(slurmd_job_t *job) { int i; + for (i = 0; i < job->ntasks; i++) _io_finalize(job->task[i]); + + /* Signal IO thread to close appropriate + * client connections + */ + pthread_kill(job->ioid, SIGHUP); } static void _handle_unprocessed_output(slurmd_job_t *job) { int i; + task_info_t *t; + struct io_info *io; + List readers; + size_t n = 0; /* XXX Do something with unwritten IO */ for (i = 0; i < job->ntasks; i++) { - size_t n; - task_info_t *t = job->task[i]; - List readers = ((struct io_info *)t->out->arg)->readers; - struct io_info *io = list_peek(readers); + if (!(t = job->task[i])) + continue; + if (!(readers = ((struct io_info *)t->out->arg)->readers)) + continue; + if (!(io = list_peek(readers))) + continue; + if (io->buf && (n = cbuf_used(io->buf))) job_error(job, "%ld bytes of stdout unprocessed", n); - readers = ((struct io_info *)t->err->arg)->readers; - io = list_peek(readers); + + if (!(readers = ((struct io_info *)t->err->arg)->readers)) + continue; + if (!(io = list_peek(readers))) + continue; + if (io->buf && (n = cbuf_used(io->buf))) job_error(job, "%ld bytes of stderr unprocessed", n); } @@ -372,6 +418,37 @@ _io_add_connecting(slurmd_job_t *job, task_info_t *t, srun_info_t *srun, list_append(job->objs, (void *)obj); } +/* + * If filename is given for stdout/err/in, open appropriate file, + * otherwise create a connecting client back to srun process. + */ +static int +_io_prepare_one(slurmd_job_t *j, task_info_t *t, srun_info_t *s) +{ + if (s->ofname) { + if (_open_output_file(j, t, s->ofname, CLIENT_STDOUT) < 0) + return SLURM_FAILURE; + } else { + _io_add_connecting(j, t, s, CLIENT_STDOUT); + } + + if (s->efname) { + if (_open_output_file(j, t, s->efname, CLIENT_STDERR) < 0) + return SLURM_FAILURE; + } else { + _io_add_connecting(j, t, s, CLIENT_STDERR); + } + + if (s->ifname) { + if (_open_stdin_file(j, t, s) < 0) + return SLURM_FAILURE; + } else if (s->ofname) { + _io_add_connecting(j, t, s, CLIENT_STDIN); + } + + return SLURM_SUCCESS; +} + /* * create initial client objs for N tasks */ @@ -379,26 +456,21 @@ static int _io_prepare_clients(slurmd_job_t *job) { int i; + char host[256]; + short port; srun_info_t *srun; - char host[256]; - short port; - - /* xassert(list_count(job->sruns) == 1); */ srun = list_peek(job->sruns); - if (srun->noconnect) - return SLURM_SUCCESS; + xassert(srun != NULL); slurm_get_addr(&srun->ioaddr, &port, host, sizeof(host)); - debug2("connecting IO back to %s:%d", host, port); + debug2("connecting IO back to %s:%d", host, ntohs(port)); - /* - * connect back to clients for stdin/out/err + /* Connect stdin/out/err to either a remote srun or + * local file */ for (i = 0; i < job->ntasks; i++) { - list_append(job->task[i]->srun_list, (void *) srun); - _io_add_connecting(job, job->task[i], srun, CLIENT_STDOUT); - _io_add_connecting(job, job->task[i], srun, CLIENT_STDERR); + _io_prepare_one(job, job->task[i], srun); /* kick IO thread */ pthread_kill(job->ioid, SIGHUP); @@ -429,27 +501,27 @@ _open_task_file(char *filename, int flags) } static int -_open_output_file(slurmd_job_t *job, task_info_t *t, slurmd_io_type_t type) +_open_output_file(slurmd_job_t *job, task_info_t *t, char *fmt, + slurmd_io_type_t type) { - int fd = -1; - io_obj_t *obj = NULL; - int flags = O_CREAT|O_TRUNC|O_APPEND|O_WRONLY; - char *fname; + int fd = -1; + io_obj_t *obj = NULL; + int flags = O_CREAT|O_TRUNC|O_APPEND|O_WRONLY; + char *fname = fname_create(job, fmt, t->gid); xassert((type == CLIENT_STDOUT) || (type == CLIENT_STDERR)); - fname = (type == CLIENT_STDOUT) ? t->ofname : t->efname; if ((fd = _open_task_file(fname, flags)) > 0) { - verbose("opened `%s' for %s fd %d", fname, _io_str[type], fd); + debug("opened `%s' for %s fd %d", fname, _io_str[type], fd); obj = _io_obj(job, t, fd, type); _obj_set_unreadable(obj); - obj->ops->handle_write = &_file_write; xassert(obj->ops->writable != NULL); if (type == CLIENT_STDOUT) _io_client_attach(obj, t->out, NULL, job->objs); else _io_client_attach(obj, t->err, NULL, job->objs); } + xfree(fname); _validate_io_list(job->objs); @@ -457,41 +529,22 @@ _open_output_file(slurmd_job_t *job, task_info_t *t, slurmd_io_type_t type) } static int -_open_stdin_file(slurmd_job_t *job, task_info_t *t) +_open_stdin_file(slurmd_job_t *job, task_info_t *t, srun_info_t *srun) { int fd = -1; io_obj_t *obj = NULL; int flags = O_RDONLY; + char *fname = fname_create(job, srun->ifname, t->gid); - if ((fd = _open_task_file(t->ifname, flags)) > 0) { + if ((fd = _open_task_file(fname, flags)) > 0) { obj = _io_obj(job, t, fd, CLIENT_STDIN); _obj_set_unwritable(obj); _io_client_attach(obj, NULL, t->in, job->objs); } + xfree(fname); return fd; } -static int -_io_prepare_files(slurmd_job_t *job) -{ - int i; - - if (!job->ofname && !job->efname && !job->ifname) - return SLURM_SUCCESS; - - for (i = 0; i < job->ntasks; i++) { - if (_open_output_file(job, job->task[i], CLIENT_STDOUT) < 0) - return SLURM_FAILURE; - if (_open_output_file(job, job->task[i], CLIENT_STDERR) < 0) - return SLURM_FAILURE; - if (job->ifname && (_open_stdin_file(job, job->task[i]) < 0)) - return SLURM_FAILURE; - - pthread_kill(job->ioid, SIGHUP); - } - - return SLURM_SUCCESS; -} /* Attach io obj "client" as a reader of 'writer' and a writer to 'reader' * if 'reader' is NULL client will have no readers. @@ -511,16 +564,20 @@ _io_client_attach(io_obj_t *client, io_obj_t *writer, xassert((dst == NULL) || (dst->magic == IO_MAGIC)); if (writer == NULL) { - debug3("connecting %s to reader only", _io_str[cli->type]); - /* simple case: connect client to reader only and return + /* + * Only connect new client to reader if the + * reader is still available. + * */ - _io_connect_objs(client, reader); - list_append(objList, client); + if (reader->fd < 0 || dst->disconnected) + _obj_close(client, objList); + else + _io_connect_objs(client, reader); return; } io = list_peek(src->readers); - xassert((io == NULL) || (io->magic == IO_MAGIC)); + xassert((io == NULL) || (io->magic == IO_MAGIC)); /* Check to see if src's first reader has disconnected, * if so, replace the object with this client, if not, @@ -548,6 +605,11 @@ _io_client_attach(io_obj_t *client, io_obj_t *writer, */ list_delete_all(objList, (ListFindF)find_obj, client); + /* + * Rewind a few lines if possible + */ + cbuf_rewind_line(io->buf, 256, -1); + /* * connect resurrected client ("io") to reader * if (reader != NULL). @@ -588,12 +650,14 @@ _io_connect_objs(io_obj_t *obj1, io_obj_t *obj2) struct io_info *dst = (struct io_info *) obj2->arg; xassert(src->magic == IO_MAGIC); xassert(dst->magic == IO_MAGIC); + if (!list_find_first(src->readers, (ListFindF)find_obj, dst)) list_append(src->readers, dst); if (!list_find_first(dst->writers, (ListFindF)find_obj, src)) list_append(dst->writers, src); } +/* static int find_fd(void *obj, void *key) { @@ -602,6 +666,7 @@ find_fd(void *obj, void *key) return (((io_obj_t *)obj)->fd == *((int *)key)); } +*/ static int find_obj(void *obj, void *key) @@ -636,7 +701,7 @@ _io_disconnect(struct io_info *src, struct io_info *dst) static void _io_disconnect_client(struct io_info *client, List objs) { - bool destroy = false; + bool destroy = true; struct io_info *t; ListIterator i; @@ -647,19 +712,16 @@ _io_disconnect_client(struct io_info *client, List objs) /* Our client becomes a ghost */ client->disconnected = 1; - - debug("%s has %d writers", _io_str[client->type], - client->writers ? list_count(client->writers) : 0); if (client->writers) { /* delete client from its writer->readers list */ i = list_iterator_create(client->writers); while ((t = list_next(i))) { - if (list_count(t->readers) > 1) { - destroy = true; + if (list_count(t->readers) > 1) _io_disconnect(t, client); - } + else + destroy = false; } list_iterator_destroy(i); } @@ -668,24 +730,22 @@ _io_disconnect_client(struct io_info *client, List objs) /* delete client from its reader->writers list */ i = list_iterator_create(client->readers); - while ((t = list_next(i))) { - if (list_count(t->writers) > 1) { - _io_disconnect(client, t); - } - } + while ((t = list_next(i))) + _io_disconnect(client, t); list_iterator_destroy(i); } xassert(client == client->obj->arg); - if (!client->eof && client->buf) - cbuf_rewind_line(client->buf, 256, -1); + if (!destroy) + return; - if (destroy) { - if (!list_delete_all(objs, (ListFindF)find_obj, client->obj)) - error("Unable to destroy %s %d (%p)", - _io_str[client->type], client->id, client); - } + debug3("Going to destroy %s %d", _io_str[client->type], client->id); + if (list_delete_all(objs, (ListFindF)find_obj, client->obj) <= 0) + error("Unable to destroy %s %d (%p)", + _io_str[client->type], client->id, client); + + return; } static bool @@ -751,9 +811,14 @@ _io_obj(slurmd_job_t *job, task_info_t *t, int fd, int type) io->buf = cbuf_create(1024, 1048576); io->writers = list_create(NULL); break; - case CLIENT_STDIN: + case CLIENT_STDIN: obj->ops = _ops_copy(&client_ops); + _obj_set_unwritable(obj); io->readers = list_create(NULL); + /* + * Connected stdin still needs output buffer + */ + io->buf = cbuf_create(256, 1024); break; default: error("io: unknown I/O obj type %d", type); @@ -800,6 +865,7 @@ io_obj_destroy(io_obj_t *obj) xfree(obj->ops); break; case CLIENT_STDIN: + cbuf_destroy(io->buf); xfree(obj->ops); list_destroy(io->readers); break; @@ -865,9 +931,11 @@ _io_write_header(struct io_info *client, srun_info_t *srun) hdr.version = SLURM_PROTOCOL_VERSION; memcpy(hdr.key, srun->key->data, SLURM_SSL_SIGNATURE_LENGTH); hdr.task_id = client->id; - hdr.type = client->type == CLIENT_STDOUT ? - SLURM_IO_STREAM_INOUT : - SLURM_IO_STREAM_SIGERR; + + if ((client->type == CLIENT_STDOUT) || (client->type == CLIENT_STDIN)) + hdr.type = SLURM_IO_STREAM_INOUT; + else + hdr.type = SLURM_IO_STREAM_SIGERR; pack_io_stream_header(&hdr, buffer); @@ -974,11 +1042,14 @@ _writable(io_obj_t *obj) xassert(io->magic == IO_MAGIC); - debug3("_writable(): task %d fd %d %s [%d %d %d]", + debug3("_writable(): [%p] task %d fd %d %s [%d %d %d]", io, io->id, obj->fd, _io_str[io->type], io->disconnected, cbuf_used(io->buf), io->eof); - rc = (!io->disconnected && ((cbuf_used(io->buf) > 0) || io->eof)); + rc = ((io->obj->fd > 0) + && !io->disconnected + && ((cbuf_used(io->buf) > 0) || io->eof)); + if (rc) debug3("%d %s is writable", io->id, _io_str[io->type]); @@ -1011,8 +1082,9 @@ _write(io_obj_t *obj, List objs) while ((n = cbuf_read_to_fd(io->buf, obj->fd, -1)) < 0) { if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) break; + if ((errno == EPIPE) || (errno == EINVAL) || (errno == EBADF)) + _obj_close(obj, objs); error("write failed: <task %d>: %m", io->id); - _obj_close(obj, objs); return -1; } @@ -1021,33 +1093,33 @@ _write(io_obj_t *obj, List objs) return 0; } -/* flush after writing data to file - */ -static int -_file_write(io_obj_t *obj, List objs) -{ - int rc = _write(obj, objs); - fdatasync(obj->fd); - return rc; -} - static void _do_attach(struct io_info *io) { - task_info_t *t; + task_info_t *t; xassert(io != NULL); xassert(io->magic == IO_MAGIC); - xassert((io->type == CLIENT_STDOUT) || (io->type == CLIENT_STDERR)); + xassert(_isa_client(io)); io->obj->ops = _ops_copy(&client_ops); - t = io->task; + t = io->task; - if (io->type == CLIENT_STDOUT) + switch (io->type) { + case CLIENT_STDOUT: _io_client_attach(io->obj, t->out, t->in, io->job->objs); - else + break; + case CLIENT_STDERR: _io_client_attach(io->obj, t->err, NULL, io->job->objs); + break; + case CLIENT_STDIN: + _io_client_attach(io->obj, NULL, t->in, io->job->objs); + break; + default: + fatal("Unknown client type %d in do_attach()", io->type); + + } } /* Write method for client objects which are connecting back to the @@ -1060,15 +1132,17 @@ _connecting_write(io_obj_t *obj, List objs) int n; xassert(io->magic == IO_MAGIC); - xassert((io->type == CLIENT_STDOUT) || (io->type == CLIENT_STDERR)); + xassert(_isa_client(io)); debug3("Need to write %ld bytes to connecting %s %d", cbuf_used(io->buf), _io_str[io->type], io->id); while ((n = cbuf_read_to_fd(io->buf, obj->fd, -1)) < 0) { if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) continue; - error("write failed: <task %d>: %m", io->id); - _obj_close(obj, objs); + if ((errno == EPIPE) || (errno == EINVAL) || (errno == EBADF)) + _obj_close(obj, objs); + else + error("write failed: <task %d>: %m", io->id); return -1; } debug3("Wrote %d bytes to %s %d", n, _io_str[io->type], io->id); @@ -1087,29 +1161,31 @@ _connecting_write(io_obj_t *obj, List objs) static int _shutdown_task_obj(struct io_info *t) { - List l; ListIterator i; + List rlist; struct io_info *r; xassert(_isa_task(t)); - l = (t->type == TASK_STDIN) ? t->writers : t->readers; - - i = list_iterator_create(l); - while ((r = list_next(i))) { - /* Copy EOF to all readers or writers */ + debug3("shutdown_task_obj: %d %s [%d readers, %d writers]", + t->id, _io_str[t->type], + (t->readers ? list_count(t->readers) : 0), + (t->writers ? list_count(t->writers) : 0)); + + t->disconnected = 1; + + rlist = t->writers ? : t->readers; + + /* Task objects do not get destroyed. + * Simply propagate the EOF to the clients + */ + i = list_iterator_create(rlist); + while ((r = list_next(i))) r->eof = 1; - /* XXX When is it ok to destroy a task obj? - * perhaps only if the buffer is empty ... - * but definitely not before then. - * - * For now, never destroy the task objects - * - */ - /* list_delete_all(rlist, (ListFindF) find_obj, t); */ - } list_iterator_destroy(i); + _validate_io_list(t->job->objs); + return 0; } @@ -1194,18 +1270,38 @@ _client_read(io_obj_t *obj, List objs) if ((n = read(obj->fd, (void *) buf, len)) < 0) { if (errno == EINTR) goto again; - if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) - fatal("client read"); error("read from client %ld: %m", client->id); return -1; } - debug("read %d bytes from %s %d", n, _io_str[client->type], + debug3("read %d bytes from %s %d", n, _io_str[client->type], client->id); - if (n == 0) { /* got eof, disconnect this client */ - debug3("client %d closed connection", client->id); - _obj_close(obj, objs); + if (n == 0) { /* got eof, pass this eof to readers */ + debug3("%s %d stdin closed connection", _io_str[client->type], + client->id); + /* + * Do not read from this stdin any longer + */ + _obj_set_unreadable(obj); + + /* + * Loop through this client's readers, + * noting that EOF was recvd only if this + * client is the only writer + */ + if (client->readers) { + i = list_iterator_create(client->readers); + while((reader = list_next(i))) { + if (list_count(reader->writers) == 1) + reader->eof = 1; + list_delete_all( reader->writers, + (ListFindF) find_obj, + client ); + } + list_iterator_destroy(i); + } + return 0; } @@ -1219,9 +1315,8 @@ _client_read(io_obj_t *obj, List objs) * Copy cbuf to all readers */ i = list_iterator_create(client->readers); - while((reader = list_next(i))) { - n = cbuf_write(reader->buf, (void *) buf, n, NULL); - } + while((reader = list_next(i))) + n = cbuf_write(reader->buf, (void *) buf, n, NULL); list_iterator_destroy(i); return 0; diff --git a/src/slurmd/job.c b/src/slurmd/job.c index db120bdbc9d..7c221fa1bd1 100644 --- a/src/slurmd/job.c +++ b/src/slurmd/job.c @@ -45,12 +45,12 @@ #include "src/slurmd/job.h" #include "src/slurmd/shm.h" #include "src/slurmd/io.h" +#include "src/slurmd/fname.h" static char ** _array_copy(int n, char **src); static void _array_free(int n, char ***array); static void _srun_info_destructor(void *arg); -static void _job_init_task_info(slurmd_job_t *job, - launch_tasks_request_msg_t *msg); +static void _job_init_task_info(slurmd_job_t *job, uint32_t *gids); /* create a slurmd job structure from a launch tasks message */ @@ -65,16 +65,12 @@ job_create(launch_tasks_request_msg_t *msg, slurm_addr *cli_addr) xassert(msg != NULL); + debug3("entering job_create"); + if ((pwd = getpwuid((uid_t)msg->uid)) < 0) { error("uid %ld not found on system", msg->uid); return NULL; } - - memcpy(&resp_addr, cli_addr, sizeof(slurm_addr)); - memcpy(&io_addr, cli_addr, sizeof(slurm_addr)); - slurm_set_addr(&resp_addr, msg->resp_port, NULL); - slurm_set_addr(&io_addr, msg->io_port, NULL); - job = xmalloc(sizeof(*job)); job->jobid = msg->job_id; @@ -94,9 +90,12 @@ job_create(launch_tasks_request_msg_t *msg, slurm_addr *cli_addr) job->argv = _array_copy(job->argc, msg->argv); job->cwd = xstrdup(msg->cwd); - job->ofname = xstrdup(msg->ofname); - job->efname = msg->efname ? xstrdup(msg->efname) : job->ofname; - job->ifname = xstrdup(msg->ifname); + + memcpy(&resp_addr, cli_addr, sizeof(slurm_addr)); + slurm_set_addr(&resp_addr, msg->resp_port, NULL); + memcpy(&io_addr, cli_addr, sizeof(slurm_addr)); + slurm_set_addr(&io_addr, msg->io_port, NULL); + #ifdef HAVE_LIBELAN3 job->qsw_job = msg->qsw_job; @@ -106,16 +105,22 @@ job_create(launch_tasks_request_msg_t *msg, slurm_addr *cli_addr) srun = srun_info_create((void *)msg->credential->signature, &resp_addr, &io_addr); + srun->ofname = xstrdup(msg->ofname); + srun->efname = xstrdup(msg->efname); + srun->ifname = xstrdup(msg->ifname); - job->sruns = list_create((ListDelF) _srun_info_destructor); + job->sruns = list_create((ListDelF) _srun_info_destructor); list_append(job->sruns, (void *) srun); - _job_init_task_info(job, msg); + _job_init_task_info(job, msg->global_task_ids); return job; } +/* + * return the default output filename for a batch job + */ static char * _mkfilename(slurmd_job_t *job, const char *name) { @@ -125,7 +130,7 @@ _mkfilename(slurmd_job_t *job, const char *name) snprintf(buf, 256, "%s/job%u.out", job->cwd, job->jobid); return xstrdup(buf); } else - return xstrdup(name); + return fname_create(job, name, 0); } slurmd_job_t * @@ -133,7 +138,8 @@ job_batch_job_create(batch_job_launch_msg_t *msg) { struct passwd *pwd; slurmd_job_t *job = xmalloc(sizeof(*job)); - task_info_t *t = task_info_create(0, 0); + srun_info_t *srun = NULL; + uint32_t gid = 0; if ((pwd = getpwuid((uid_t)msg->uid)) < 0) { error("uid %ld not found on system", msg->uid); @@ -147,65 +153,43 @@ job_batch_job_create(batch_job_launch_msg_t *msg) job->uid = (uid_t)msg->uid; job->cwd = xstrdup(msg->work_dir); - job->ofname = _mkfilename(job, msg->out); - job->efname = msg->err ? xstrdup(msg->err): job->ofname; - job->ifname = xstrdup("/dev/null"); - job->envc = msg->envc; job->env = _array_copy(job->envc, msg->environment); job->objs = list_create((ListDelF) io_obj_destroy); job->sruns = list_create((ListDelF) _srun_info_destructor); - job->task = (task_info_t **) xmalloc(sizeof(task_info_t *)); - job->task[0] = t; - t->ofname = xstrdup(job->ofname); - t->efname = xstrdup(job->efname); - t->ifname = xstrdup(job->ifname); + srun = srun_info_create(NULL, NULL, NULL); + + srun->ofname = _mkfilename(job, msg->out); + srun->efname = msg->err ? xstrdup(msg->err) : srun->ofname; + srun->ifname = xstrdup("/dev/null"); + list_append(job->sruns, (void *) srun); job->argc = msg->argc > 0 ? msg->argc : 2; + + /* job script has not yet been written out to disk -- + * argv will be filled in later + */ job->argv = (char **) xmalloc(job->argc * sizeof(char *)); - return job; -} -static int -_wid(uint32_t n) -{ - int width = 1; - while (n /= 10L) - width++; - return width; -} + _job_init_task_info(job, &gid); -static char * -_task_filename_create(const char *basename, int i, int width) -{ - int len = basename ? strlen(basename) : 0; - char buf[len+width+16]; - if (basename == NULL) - return NULL; - snprintf(buf, len+width+16, "%s%0*d", basename, width, i); - return xstrdup(buf); + return job; } - static void -_job_init_task_info(slurmd_job_t *job, launch_tasks_request_msg_t *msg) +_job_init_task_info(slurmd_job_t *job, uint32_t *gid) { int i; int n = job->ntasks; - int wid = _wid(job->nprocs); - uint32_t *gid = msg->global_task_ids; srun_info_t *srun = (srun_info_t *) list_peek(job->sruns); job->task = (task_info_t **) xmalloc(n * sizeof(task_info_t *)); for (i = 0; i < n; i++){ - task_info_t *t = job->task[i] = task_info_create(i, gid[i]); + job->task[i] = task_info_create(i, gid[i]); if (srun != NULL) - list_append(t->srun_list, (void *)srun); - t->ofname = _task_filename_create(job->ofname, t->gid, wid); - t->efname = _task_filename_create(job->efname, t->gid, wid); - t->ifname = _task_filename_create(job->ifname, t->gid, wid); + list_append(job->task[i]->srun_list, (void *)srun); } } @@ -294,11 +278,11 @@ srun_info_create(void *keydata, slurm_addr *resp_addr, slurm_addr *ioaddr) if (keydata != NULL) memcpy((void *) key->data, keydata, SLURM_KEY_SIZE); srun->key = key; + if (ioaddr != NULL) - srun->ioaddr = *ioaddr; - else - srun->noconnect = true; - srun->resp_addr = *resp_addr; + srun->ioaddr = *ioaddr; + if (resp_addr != NULL) + srun->resp_addr = *resp_addr; return srun; } @@ -341,9 +325,6 @@ task_info_create(int taskid, int gtaskid) t->in = NULL; t->out = NULL; t->err = NULL; - t->ifname = NULL; - t->ofname = NULL; - t->efname = NULL; t->srun_list = list_create(NULL); slurm_mutex_unlock(&t->mutex); return t; diff --git a/src/slurmd/job.h b/src/slurmd/job.h index e19d323b7f5..6b561440d71 100644 --- a/src/slurmd/job.h +++ b/src/slurmd/job.h @@ -69,9 +69,6 @@ typedef struct task_info { *out, /* I/O objects used in IO event loop */ *err; int estatus; /* this task's exit status */ - char * ofname; /* output file (if any) */ - char * efname; /* error file (if any) */ - char * ifname; /* input file (if any) */ List srun_list; /* List of srun objs for this task */ } task_info_t; @@ -80,9 +77,10 @@ typedef struct srun_info { srun_key_t *key; /* srun key for IO verification */ slurm_addr resp_addr; /* response addr for task exit msg */ slurm_addr ioaddr; /* Address to connect on for I/O */ + char * ofname; /* output file (if any) */ + char * efname; /* error file (if any) */ + char * ifname; /* input file (if any) */ - bool noconnect; /* don't connect I/O back to srun * - * e.g. for local file I/O only */ } srun_info_t; typedef struct slurmd_job { @@ -102,9 +100,6 @@ typedef struct slurmd_job { #endif uid_t uid; struct passwd *pwd; - char *ifname; - char *ofname; - char *efname; time_t timelimit; task_info_t **task; List objs; diff --git a/src/slurmd/mgr.c b/src/slurmd/mgr.c index 7012c24f51f..0e41d2bb504 100644 --- a/src/slurmd/mgr.c +++ b/src/slurmd/mgr.c @@ -650,6 +650,7 @@ _exec_all_tasks(slurmd_job_t *job) else _task_exec(job, i, false); + debug3("All tasks exited"); return; } diff --git a/src/srun/Makefile.am b/src/srun/Makefile.am index 7f3e3bdeea3..7659b6de51c 100644 --- a/src/srun/Makefile.am +++ b/src/srun/Makefile.am @@ -12,7 +12,7 @@ bin_PROGRAMS = srun srun_SOURCES = srun.c opt.c env.c opt.h env.h job.c job.h net.c net.h \ msg.c msg.h io.c io.h launch.h launch.c attach.h \ - reattach.c reattach.h + reattach.c reattach.h fname.h fname.c srun_LDADD = $(top_builddir)/src/common/libcommon.la \ $(top_builddir)/src/api/libslurm.la \ $(POPT_LIBS) $(TOTALVIEW_LIBS) diff --git a/src/srun/fname.c b/src/srun/fname.c new file mode 100644 index 00000000000..bb07f67968f --- /dev/null +++ b/src/srun/fname.c @@ -0,0 +1,133 @@ + +#include <stdio.h> +#include <stdarg.h> +#include <stdlib.h> +#include <string.h> +#include <ctype.h> + +#include "src/srun/job.h" +#include "src/srun/fname.h" +#include "src/srun/opt.h" + +#include "src/common/xmalloc.h" +#include "src/common/xstring.h" +#include "src/common/xassert.h" + +/* + * Max zero-padding width allowed + */ +#define MAX_WIDTH 10 + + +/* + * Fill in as much of filename as possible from srun, update + * filename type to one of the io types ALL, NONE, PER_TASK, ONE + */ +io_filename_t * +fname_create(job_t *job, char *format) +{ + unsigned long int wid = 0; + unsigned long int taskid = 0; + io_filename_t *fname = NULL; + char *p, *q, *name; + + fname = xmalloc(sizeof(*fname)); + + /* Handle special cases + */ + + if (format == NULL) { + fname->type = IO_ALL; + fname->name = NULL; + return fname; + } + + if (strncasecmp(format, "all", (size_t) 3) == 0) { + fname->type = IO_ALL; + fname->name = NULL; + return fname; + } + + if (strncasecmp(format, "none", (size_t) 4) == 0) { + fname->type = IO_NONE; + fname->name = "/dev/null"; + return fname; + } + + if (strncmp(format, "-", (size_t) 1) == 0) { + fname->type = IO_ALL; + fname->name = NULL; + return fname; + } + + taskid = strtoul(format, &p, 10); + if ((*p == '\0') && ((int) taskid < opt.nprocs)) { + fname->type = IO_ONE; + fname->taskid = (uint32_t) taskid; + fname->name = NULL; + return fname; + } + + name = NULL; + q = p = format; + while (*p != '\0') { + if (*p == '%') { + if (isdigit(*(++p))) { + xmemcat(name, q, p - 1); + if ((wid = strtoul(p, &p, 10)) > MAX_WIDTH) + wid = MAX_WIDTH; + q = p - 1; + if (*p == '\0') + break; + } + + switch (*p) { + case 't': /* '%t' => taskid */ + case 'n': /* '%n' => nodeid */ + case 'N': /* '%N' => node name */ + + fname->type = IO_PER_TASK; + if (wid) + xstrfmtcat(name, "%%"); + p++; + break; + + case 'J': /* '%J' => "jobid.stepid" */ + case 'j': /* '%j' => jobid */ + + xmemcat(name, q, p - 1); + xstrfmtcat(name, "%0*d", wid, job->jobid); + + if ((*p == 'J') && (job->stepid != NO_VAL)) + xstrfmtcat(name, ".%d", job->stepid); + q = ++p; + break; + + case 's': /* '%s' => stepid */ + xmemcat(name, q, p - 1); + xstrfmtcat(name, "%0*d", wid, job->stepid); + q = ++p; + break; + + default: + break; + } + wid = 0; + } else + p++; + } + + if (q != p) + xmemcat(name, q, p); + + fname->name = name; + return fname; +} + +void +fname_destroy(io_filename_t *f) +{ + if (f->name) + xfree(f->name); + xfree(f); +} diff --git a/src/srun/fname.h b/src/srun/fname.h new file mode 100644 index 00000000000..d6dfcd0441d --- /dev/null +++ b/src/srun/fname.h @@ -0,0 +1,56 @@ +/*****************************************************************************\ + * src/srun/fname.c - IO filename type implementation (srun specific) + ***************************************************************************** + * Copyright (C) 2002 The Regents of the University of California. + * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). + * Written by Mark Grondona <mgrondona@llnl.gov>. + * UCRL-CODE-2002-040. + * + * This file is part of SLURM, a resource management program. + * For details, see <http://www.llnl.gov/linux/slurm/>. + * + * SLURM is free software; you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the License, or (at your option) + * any later version. + * + * SLURM is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along + * with SLURM; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. +\*****************************************************************************/ + +#ifndef _FNAME_H +#define _FNAME_H + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#include "src/srun/opt.h" + +typedef struct io_filename { + char *name; + enum io_t type; + int taskid; /* taskid for IO if IO_ONE */ +} io_filename_t; + +/* need to predeclare srun_job to resolve declaration dependencies + */ +typedef struct srun_job * srun_job_t; + +/* + * Create an filename from a (probably user supplied) filename format. + * fname_create() will expand the format as much as possible for srun, + * leaving node or task specific format specifiers for the remote + * slurmd to handle. + */ +io_filename_t * fname_create(srun_job_t job, char *format); +void fname_destroy(io_filename_t *fname); + +#endif /* !_FNAME_H */ + diff --git a/src/srun/io.c b/src/srun/io.c index 4e4f1a7815c..ca265a8ec16 100644 --- a/src/srun/io.c +++ b/src/srun/io.c @@ -55,6 +55,7 @@ #define POLL_TIMEOUT_MSEC 500 /* max wait for i/o poll, msec */ static time_t time_first_done = 0; +static int fmt_width = 0; /* fd_info struct used in poll() loop to map fds back to task number, * appropriate output type (stdout/err), and original fd @@ -72,6 +73,8 @@ static int _close_stream(int *fd, FILE *out, int tasknum); static void _do_poll_timeout(job_t *job); static int _do_task_output(int *fd, FILE *out, cbuf_t buf, int tasknum); static int _do_task_output_poll(fd_info_t *info); +static int _do_task_input(job_t *job, int taskid); +static int _do_task_input_poll(job_t *job, fd_info_t *info); static int _handle_pollerr(fd_info_t *info); static char * _host_state_name(host_state_t state_inx); static ssize_t _readn(int fd, void *buf, size_t nbytes); @@ -93,12 +96,22 @@ static int _validate_header(slurm_io_stream_header_t *hdr, job_t *job); #define _poll_wr_isset(pfd) ((pfd).revents & POLLOUT) #define _poll_err(pfd) ((pfd).revents & POLLERR) +/* True if an EOF needs to be broadcast to all tasks + */ +static bool stdin_got_eof = false; + static int _do_task_output_poll(fd_info_t *info) { return _do_task_output(info->fd, info->fp, info->buf, info->taskid); } +static int +_do_task_input_poll(job_t *job, fd_info_t *info) +{ + return _do_task_input(job, info->taskid); +} + static int _handle_pollerr(fd_info_t *info) { @@ -107,8 +120,11 @@ _handle_pollerr(fd_info_t *info) socklen_t size = sizeof(int); if (getsockopt(fd, SOL_SOCKET, SO_ERROR, (void *)&err, &size) < 0) error("_handle_error_poll: getsockopt: %m"); - else if (err) - error("poll error on fd %d: %s", fd, slurm_strerror(err)); + else if (err) { + if ((err != EPIPE) && (err != ECONNRESET)) + error("poll error on fd %d: %s", fd, slurm_strerror(err)); + _close_stream(info->fd, info->fp, info->taskid); + } if (*info->fd > 0) return _do_task_output_poll(info); @@ -132,7 +148,7 @@ _io_thr_poll(void *job_arg) nfds_t nfds = 0; int numfds = (opt.nprocs*2) + job->niofds + 2; fd_info_t map[numfds]; /* map fd in pollfd array to fd info */ - int i, rc; + int i, rc, out_fd_state, err_fd_state; xassert(job != NULL); @@ -143,26 +159,51 @@ _io_thr_poll(void *job_arg) _set_iofds_nonblocking(job); + if (job->ofname->type == IO_ALL) + out_fd_state = WAITING_FOR_IO; + else { + if (job->ifname->type != IO_ALL) + out_fd_state = IO_DONE; + else + out_fd_state = WAITING_FOR_IO; + + if (!job->efname->name) + err_fd_state = IO_DONE; + } + + if (job->efname->name == IO_ALL && opt.efname) { + err_fd_state = WAITING_FOR_IO; + } else + err_fd_state = IO_DONE; + for (i = 0; i < opt.nprocs; i++) { - job->out[i] = WAITING_FOR_IO; - job->err[i] = WAITING_FOR_IO; + job->out[i] = out_fd_state; + job->err[i] = err_fd_state; } - for (i = 0; i < job->niofds; i++) { + for (i = 0; i < job->niofds; i++) _poll_set_rd(fds[i], job->iofd[i]); - } - _poll_set_rd(fds[i], STDIN_FILENO); while (1) { int eofcnt = 0; - nfds = job->niofds+1; /* already have n ioport fds + stdin */ + nfds = job->niofds; /* already have n ioport fds + stdin */ + + if (job->stdinfd >= 0) { + _poll_set_rd(fds[nfds], job->stdinfd); + nfds++; + } for (i = 0; i < opt.nprocs; i++) { if (job->out[i] > 0) { _poll_set_rd(fds[nfds], job->out[i]); + + if ( (cbuf_used(job->inbuf[i]) > 0) + || (stdin_got_eof && !job->stdin_eof[i])) + _poll_set_wr(fds[nfds], job->out[i]); + map[nfds].taskid = i; map[nfds].fd = &job->out[i]; - map[nfds].fp = stdout; + map[nfds].fp = job->outstream; map[nfds].buf = job->outbuf[i]; nfds++; } @@ -171,19 +212,18 @@ _io_thr_poll(void *job_arg) _poll_set_rd(fds[nfds], job->err[i]); map[nfds].taskid = i; map[nfds].fd = &job->err[i]; - map[nfds].fp = stderr; + map[nfds].fp = job->errstream; map[nfds].buf = job->errbuf[i]; nfds++; } - if ((job->out[i] == IO_DONE) && - (job->err[i] == IO_DONE)) + if ( (job->out[i] == IO_DONE) + && (job->err[i] == IO_DONE) ) eofcnt++; } /* exit if we have received EOF on all streams */ if (eofcnt) { - debug3("eofcnt == %d", eofcnt); if (eofcnt == opt.nprocs) pthread_exit(0); if (time_first_done == 0) @@ -222,19 +262,22 @@ _io_thr_poll(void *job_arg) } } - if (_poll_rd_isset(fds[i++])) - _bcast_stdin(STDIN_FILENO, job); + if ((job->stdinfd >= 0) && _poll_rd_isset(fds[i++])) + _bcast_stdin(job->stdinfd, job); for ( ; i < nfds; i++) { unsigned short revents = fds[i].revents; xassert(!(revents & POLLNVAL)); - if ((revents & POLLERR) || - (revents & POLLHUP) || - (revents & POLLNVAL)) + if ( (revents & POLLERR) + || (revents & POLLHUP) + || (revents & POLLNVAL)) _handle_pollerr(&map[i]); if ((revents & POLLIN) && (*map[i].fd > 0)) _do_task_output_poll(&map[i]); + + if ((revents & POLLOUT) && (*map[i].fd > 0)) + _do_task_input_poll(job, &map[i]); } } xfree(fds); @@ -255,15 +298,16 @@ static void _do_poll_timeout (job_t *job) if (job->state == SRUN_JOB_FAILED) pthread_exit(0); else if (time_first_done && opt.max_wait && - (age > opt.max_wait)) { - error("First task termination %d seconds ago", age); + (age >= opt.max_wait) + ) { + error("First task terminated %d seconds ago", age); error("Terminating remaining tasks now"); report_task_status(job); update_job_state(job, SRUN_JOB_FAILED); pthread_exit(0); } else if (time_first_done && (term_msg_sent == false) && - (age > MAX_TERM_WAIT_SEC)) { - info("Warning: First task termination %d seconds ago", age); + (age >= MAX_TERM_WAIT_SEC)) { + info("Warning: First task terminated %d seconds ago", age); term_msg_sent = true; } } @@ -335,6 +379,53 @@ static char *_task_state_name(task_state_t state_inx) } } +static int +_stdin_open(char *filename) +{ + int fd; + int flags = O_RDONLY; + + xassert(filename != NULL); + + if ((fd = open(filename, flags, 0644)) < 0) { + error ("Unable to open `%s': %m", filename); + return -1; + } + fd_set_nonblocking(fd); + fd_set_close_on_exec(fd); + return fd; +} + +static FILE * +_fopen(char *filename) +{ + FILE *fp; + if (!(fp = fopen(filename, "w"))) + error ("Unable to open file `%s': %m", filename); + + return fp; +} + +static void +_open_streams(job_t *job) +{ + if ((job->ifname->type != IO_PER_TASK) && job->ifname->name) + job->stdinfd = _stdin_open(job->ifname->name); + else + job->stdinfd = STDIN_FILENO; + + if ((job->ofname->type != IO_PER_TASK) && job->ofname->name) + job->outstream = _fopen(job->ofname->name); + else + job->outstream = stdout; + + if ((job->efname->type != IO_PER_TASK) && job->efname->name) + job->errstream = _fopen(job->ifname->name); + else + job->errstream = stderr; + +} + void * io_thr(void *arg) @@ -342,12 +433,24 @@ io_thr(void *arg) return _io_thr_poll(arg); } +static int +_wid(int n) +{ + int width = 1; + while (n /= 10) + width++; + return width; +} + int io_thr_create(job_t *job) { int i; pthread_attr_t attr; + if (opt.labelio) + fmt_width = _wid(opt.nprocs); + for (i = 0; i < job->niofds; i++) { if (net_stream_listen(&job->iofd[i], &job->ioport[i]) < 0) fatal("unable to initialize stdio server port: %m"); @@ -356,6 +459,8 @@ io_thr_create(job_t *job) net_set_low_water(job->iofd[i], 140); } + _open_streams(job); + pthread_attr_init(&attr); if ((errno = pthread_create(&job->ioid, &attr, &io_thr, (void *) job))) fatal("Unable to create io thread: %m"); @@ -473,7 +578,7 @@ _do_task_output(int *fd, FILE *out, cbuf_t buf, int tasknum) while ((len = cbuf_read_line(buf, line, sizeof(line), 1))) { if (opt.labelio) - fprintf(out, "%d: ", tasknum); + fprintf(out, "%0*d: ", fmt_width, tasknum); fputs(line, out); fflush(out); } @@ -481,6 +586,28 @@ _do_task_output(int *fd, FILE *out, cbuf_t buf, int tasknum) return len; } +static int +_do_task_input(job_t *job, int taskid) +{ + int len = 0; + cbuf_t buf = job->inbuf[taskid]; + int fd = job->out[taskid]; + + if ( stdin_got_eof + && !job->stdin_eof[taskid] + && (cbuf_used(buf) == 0) ) { + /* write(fd, &eot, 1); */ + job->stdin_eof[taskid] = true; + shutdown(job->out[taskid], SHUT_WR); + return 0; + } + + if ((len = cbuf_read_to_fd(buf, fd, -1)) < 0) + error ("writing stdin data; %m"); + + return len; +} + static ssize_t _readx(int fd, char *buf, size_t maxbytes) { @@ -492,13 +619,10 @@ _readx(int fd, char *buf, size_t maxbytes) goto again; if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) - return 0; + return -1; error("readx fd %d: %m", fd, n); return -1; /* shutdown socket, cleanup. */ } - /* null terminate */ - buf[n] = '\0'; - return n; } @@ -531,30 +655,29 @@ _readn(int fd, void *buf, size_t nbytes) static void _bcast_stdin(int fd, job_t *job) { - int i, disc=0; - char buf[IO_BUFSIZ]; + int i; size_t len; + char buf[4096]; - if ((len = _readx(fd, buf, IO_BUFSIZ)) <= 0) { - if (len == 0) /* got EOF */ - buf[len++] = 4; - else { + if ((len = _readx(fd, buf, 4096)) <= 0) { + if (len == 0) { /* got EOF */ + close(job->stdinfd); + job->stdinfd = IO_DONE; + stdin_got_eof = true; + return; + } else { error("error reading stdin. %m"); return; } } - /* broadcast to every connected task */ - for (i = 0; i < opt.nprocs; i++) { - if ((job->out[i] == WAITING_FOR_IO) || - (job->out[i] == IO_DONE)) - disc++; - else - write(job->out[i], buf, len); + if (job->ifname->type == IO_ONE) { + i = job->ifname->taskid; + cbuf_write(job->inbuf[i], buf, len, NULL); + } else { + for (i = 0; i < opt.nprocs; i++) + cbuf_write(job->inbuf[i], buf, len, NULL); } - if (disc) - error("Stdin could not be sent to %d disconnected tasks", - disc); return; } diff --git a/src/srun/job.c b/src/srun/job.c index f7dd9a4076b..3a99079a788 100644 --- a/src/srun/job.c +++ b/src/srun/job.c @@ -37,13 +37,197 @@ #include "src/srun/job.h" #include "src/srun/opt.h" +#include "src/srun/fname.h" + +typedef struct allocation_info { + uint32_t jobid; + uint32_t stepid; + char *nodelist; + slurm_addr *addrs; + int *cpus_per_node; + int *cpu_count_reps; +} allocation_info_t; + + +static inline int +_estimate_nports(int nclients, int cli_per_port) +{ + div_t d; + d = div(nclients, cli_per_port); + return d.rem > 0 ? d.quot + 1 : d.quot; +} + + +static job_t * +_job_create_internal(allocation_info_t *info) +{ + int i; + int cpu_cnt = 0; + int cpu_inx = 0; + int tph = 0; + hostlist_t hl; + job_t *job; + + job = xmalloc(sizeof(*job)); + + slurm_mutex_init(&job->state_mutex); + pthread_cond_init(&job->state_cond, NULL); + job->state = SRUN_JOB_INIT; + + job->nodelist = xstrdup(info->nodelist); + hl = hostlist_create(job->nodelist); + job->nhosts = hostlist_count(hl); + + job->jobid = info->jobid; + job->stepid = info->stepid; + + job->slurmd_addr = xmalloc(job->nhosts * sizeof(slurm_addr)); + memcpy(job->slurmd_addr, info->addrs, sizeof(slurm_addr)*job->nhosts); + + job->host = (char **) xmalloc(job->nhosts * sizeof(char *)); + job->cpus = (int *) xmalloc(job->nhosts * sizeof(int) ); + job->ntask = (int *) xmalloc(job->nhosts * sizeof(int) ); + + /* Compute number of file descriptors / Ports needed for Job + * control info server + */ + job->njfds = _estimate_nports(opt.nprocs, 48); + job->jfd = (slurm_fd *) xmalloc(job->njfds * sizeof(slurm_fd)); + job->jaddr = (slurm_addr *) xmalloc(job->njfds * sizeof(slurm_addr)); + + debug3("njfds = %d", job->njfds); + + /* Compute number of IO file descriptors needed and allocate + * memory for them + */ + job->niofds = _estimate_nports(opt.nprocs, 64); + job->iofd = (int *) xmalloc(job->niofds * sizeof(int)); + job->ioport = (int *) xmalloc(job->niofds * sizeof(int)); + + /* ntask stdout and stderr fds */ + job->out = (int *) xmalloc(opt.nprocs * sizeof(int)); + job->err = (int *) xmalloc(opt.nprocs * sizeof(int)); + + /* ntask cbufs for stdout and stderr */ + job->outbuf = (cbuf_t *) xmalloc(opt.nprocs * sizeof(cbuf_t)); + job->errbuf = (cbuf_t *) xmalloc(opt.nprocs * sizeof(cbuf_t)); + job->inbuf = (cbuf_t *) xmalloc(opt.nprocs * sizeof(cbuf_t)); + job->stdin_eof = (bool *) xmalloc(opt.nprocs * sizeof(bool)); + + + /* nhost host states */ + job->host_state = xmalloc(job->nhosts * sizeof(host_state_t)); + + /* ntask task states and statii*/ + job->task_state = xmalloc(opt.nprocs * sizeof(task_state_t)); + job->tstatus = xmalloc(opt.nprocs * sizeof(int)); + + for (i = 0; i < opt.nprocs; i++) { + job->task_state[i] = SRUN_TASK_INIT; + job->outbuf[i] = cbuf_create(1024, 1048576); + job->errbuf[i] = cbuf_create(1024, 1048576); + job->inbuf[i] = cbuf_create(256, 8192); + job->stdin_eof[i] = false; + } + + slurm_mutex_init(&job->task_mutex); + + /* tasks per host, round up */ + tph = (opt.nprocs+job->nhosts-1) / job->nhosts; + + for(i = 0; i < job->nhosts; i++) { + job->host[i] = hostlist_shift(hl); + + if (opt.overcommit) + job->cpus[i] = tph; + else + job->cpus[i] = info->cpus_per_node[cpu_inx]; + + if ((++cpu_cnt) >= info->cpu_count_reps[cpu_inx]) { + /* move to next record */ + cpu_inx++; + cpu_cnt = 0; + } + } + + job->ifname = fname_create(job, opt.ifname); + job->ofname = fname_create(job, opt.ofname); + job->efname = opt.efname ? fname_create(job, opt.efname) : job->ofname; + + hostlist_destroy(hl); + + return job; +} + +job_t * +job_create_allocation(resource_allocation_response_msg_t *resp) +{ + job_t *job; + allocation_info_t *info = xmalloc(sizeof(*info)); + + info->nodelist = resp->node_list; + info->jobid = resp->job_id; + info->stepid = NO_VAL; + info->cpus_per_node = resp->cpus_per_node; + info->cpu_count_reps = resp->cpu_count_reps; + info->addrs = resp->node_addr; + + job = _job_create_internal(info); + + xfree(info); + + return (job); +} + +job_t * +job_create_noalloc(void) +{ + job_t *job; + allocation_info_t *info = xmalloc(sizeof(*info)); + int cpn = 1; + int i; + hostlist_t hl = hostlist_create(opt.nodelist); + + if (!hl) { + error("Invalid node list `%s' specified", opt.nodelist); + goto error; + } + + srand48(getpid()); + info->jobid = (uint32_t) (lrand48() % 65550L + 1L); + info->stepid = 0; + info->nodelist = opt.nodelist; + + if (opt.nprocs < hostlist_count(hl)) + opt.nprocs = hostlist_count(hl); + hostlist_destroy(hl); + + info->cpus_per_node = &cpn; + info->cpu_count_reps = &opt.nprocs; + + /* + * Create job, then fill in host addresses + */ + job = _job_create_internal(info); + + job->slurmd_addr = xmalloc(job->nhosts * sizeof(slurm_addr)); + for (i = 0; i < job->nhosts; i++) { + slurm_set_addr ( &job->slurmd_addr[i], + slurm_get_slurmd_port(), + job->host[i] ); + } + + error: + xfree(info); + return (job); + +} job_t * job_create(resource_allocation_response_msg_t *resp) { int i, cpu_cnt = 0, cpu_inx = 0; - int ntask, tph; /* ntasks left to assign and tasks per host */ - int ncpu; + int ntask, tph; /* ntasks left to assign and tasks per host */ div_t d; hostlist_t hl; job_t *job = (job_t *) xmalloc(sizeof(*job)); @@ -57,10 +241,8 @@ job_create(resource_allocation_response_msg_t *resp) job->nodelist = xstrdup(resp->node_list); hl = hostlist_create(resp->node_list); job->jobid = resp->job_id; - ncpu = *resp->cpus_per_node; job->nhosts = hostlist_count(hl); - job->slurmd_addr = (slurm_addr *) xmalloc(job->nhosts * - sizeof(slurm_addr)); + job->slurmd_addr = xmalloc(job->nhosts * sizeof(slurm_addr)); memcpy(job->slurmd_addr, resp->node_addr, sizeof(job->slurmd_addr[0])*job->nhosts); } else { @@ -75,7 +257,6 @@ job_create(resource_allocation_response_msg_t *resp) hl = hostlist_create(opt.nodelist); srand48(getpid()); job->jobid = (uint32_t) (lrand48() % 65550L + 1L); - ncpu = 1; if (opt.nprocs <= 1) opt.nprocs = hostlist_count(hl); job->nhosts = hostlist_count(hl); @@ -83,6 +264,7 @@ job_create(resource_allocation_response_msg_t *resp) sizeof(slurm_addr)); } + job->host = (char **) xmalloc(job->nhosts * sizeof(char *)); job->cpus = (int *) xmalloc(job->nhosts * sizeof(int) ); job->ntask = (int *) xmalloc(job->nhosts * sizeof(int) ); @@ -110,8 +292,10 @@ job_create(resource_allocation_response_msg_t *resp) job->err = (int *) xmalloc(opt.nprocs * sizeof(int)); /* ntask cbufs for stdout and stderr */ - job->outbuf = (cbuf_t *) xmalloc(opt.nprocs * sizeof(cbuf_t)); - job->errbuf = (cbuf_t *) xmalloc(opt.nprocs * sizeof(cbuf_t)); + job->outbuf = (cbuf_t *) xmalloc(opt.nprocs * sizeof(cbuf_t)); + job->errbuf = (cbuf_t *) xmalloc(opt.nprocs * sizeof(cbuf_t)); + job->inbuf = (cbuf_t *) xmalloc(opt.nprocs * sizeof(cbuf_t)); + job->stdin_eof = (bool *) xmalloc(opt.nprocs * sizeof(bool)); /* nhost host states */ job->host_state = xmalloc(job->nhosts * sizeof(host_state_t)); @@ -124,6 +308,8 @@ job_create(resource_allocation_response_msg_t *resp) job->task_state[i] = SRUN_TASK_INIT; job->outbuf[i] = cbuf_create(1024, 1048576); job->errbuf[i] = cbuf_create(1024, 1048576); + job->inbuf[i] = cbuf_create(256, 8192); + job->stdin_eof[i] = false; } @@ -160,6 +346,10 @@ job_create(resource_allocation_response_msg_t *resp) } + job->ifname = fname_create(job, opt.ifname); + job->ofname = fname_create(job, opt.ofname); + job->efname = opt.efname ? fname_create(job, opt.efname) : job->ofname; + return job; } diff --git a/src/srun/job.h b/src/srun/job.h index b2c72a6b088..cf21923f8d7 100644 --- a/src/srun/job.h +++ b/src/srun/job.h @@ -12,6 +12,9 @@ #include "src/common/macros.h" #include "src/common/cbuf.h" #include "src/api/slurm.h" +#include "src/common/slurm_protocol_defs.h" + +#include "src/srun/fname.h" typedef enum { SRUN_JOB_INIT = 0, @@ -74,8 +77,9 @@ typedef struct srun_job { */ cbuf_t *outbuf; cbuf_t *errbuf; + cbuf_t *inbuf; /* buffer for stdin data */ - pthread_t lid; /* launch thread id */ + pthread_t lid; /* launch thread id */ host_state_t *host_state; /* nhost host states */ @@ -86,6 +90,15 @@ typedef struct srun_job { #ifdef HAVE_LIBELAN3 qsw_jobinfo_t qsw_job; #endif + io_filename_t *ifname; + io_filename_t *ofname; + io_filename_t *efname; + + /* Output streams and stdin fileno */ + FILE *outstream; + FILE *errstream; + int stdinfd; + bool *stdin_eof; /* true if task i processed stdin eof */ } job_t; diff --git a/src/srun/launch.c b/src/srun/launch.c index bc76fd263a1..73d8ed462a1 100644 --- a/src/srun/launch.c +++ b/src/srun/launch.c @@ -119,9 +119,17 @@ launch(void *arg) for (i = 0; i < job->nhosts; i++) task_ids[i] = (uint32_t *) xmalloc(job->cpus[i] * sizeof(uint32_t)); + + if (opt.distribution == SRUN_DIST_UNKNOWN) { + if (opt.nprocs <= job->nhosts) + opt.distribution = SRUN_DIST_CYCLIC; + else + opt.distribution = SRUN_DIST_BLOCK; + } + if (opt.distribution == SRUN_DIST_BLOCK) _dist_block(job, task_ids); - else + else _dist_cyclic(job, task_ids); msg_array_ptr = (launch_tasks_request_msg_t *) @@ -147,12 +155,12 @@ launch(void *arg) r->nnodes = job->nhosts; r->nprocs = opt.nprocs; - if (opt.output == IO_PER_TASK) - r->ofname = opt.ofname; - if (opt.error == IO_PER_TASK) - r->efname = opt.efname; - if (opt.input == IO_PER_TASK) - r->ifname = opt.ifname; + if (job->ofname->type == IO_PER_TASK) + r->ofname = job->ofname->name; + if (job->efname->type == IO_PER_TASK) + r->efname = job->efname->name; + if (job->ifname->type == IO_PER_TASK) + r->ifname = job->ifname->name; /* Node specific message contents */ r->tasks_to_launch = job->ntask[i]; diff --git a/src/srun/msg.c b/src/srun/msg.c index 2f2faeeca72..1bf9007bc39 100644 --- a/src/srun/msg.c +++ b/src/srun/msg.c @@ -146,12 +146,29 @@ _reattach_handler(job_t *job, slurm_msg_t *msg) { reattach_tasks_response_msg_t *resp = msg->data; - slurm_mutex_lock(&job->task_mutex); - if ((resp->srun_node_id >= 0) && (resp->srun_node_id < job->nhosts)) { - job->host_state[resp->srun_node_id] = SRUN_HOST_REPLIED; + if ((resp->srun_node_id < 0) || (resp->srun_node_id >= job->nhosts)) { + error ("Invalid reattach response recieved~"); + return; } + + slurm_mutex_lock(&job->task_mutex); + job->host_state[resp->srun_node_id] = SRUN_HOST_REPLIED; slurm_mutex_unlock(&job->task_mutex); + if (resp->return_code != 0) { + if (job->stepid == NO_VAL) { + error ("Unable to attach to job %d: %s", + job->jobid, slurm_strerror(resp->return_code)); + update_job_state(job, SRUN_JOB_FAILED); + } else { + error ("Unable to attach to step %d.%d on node %d", + job->jobid, job->stepid, resp->srun_node_id); + } + } + + if (job->stepid == NO_VAL) + update_job_state(job, SRUN_JOB_OVERDONE); + } static void diff --git a/src/srun/opt.c b/src/srun/opt.c index e16ce4d308a..32a07e36116 100644 --- a/src/srun/opt.c +++ b/src/srun/opt.c @@ -265,9 +265,6 @@ env_vars_t env_vars[] = { {"SLURM_NNODES", OPT_INT, &opt.nodes, &opt.nodes_set}, {"SLURM_OVERCOMMIT", OPT_OVERCOMMIT, NULL, NULL}, {"SLURM_PARTITION", OPT_STRING, &opt.partition, NULL}, - {"SLURM_STDINMODE", OPT_INPUT, &opt.input, NULL}, - {"SLURM_STDOUTMODE", OPT_OUTPUT, &opt.output, NULL}, - {"SLURM_STDERRMODE", OPT_ERROR, &opt.error, NULL}, {"SLURM_DISTRIBUTION", OPT_DISTRIB, NULL, NULL}, {"SLURM_WAIT", OPT_INT, &opt.max_wait, NULL}, {NULL, 0, NULL} @@ -313,11 +310,9 @@ static void _opt_list(void); static char * _search_path(char *); static char * _find_file_path (char *fname); -static enum io_t _verify_iotype(char **name); static void _print_version(void); static enum distribution_t _verify_dist_type(const char *arg); static long _to_bytes(const char *arg); -static void _set_allocate_mode_env_vars(void); static List _create_path_list(void); /*---[ end forward declarations of static functions ]---------------------*/ @@ -345,35 +340,6 @@ static void _print_version(void) printf("%s %s\n", PACKAGE, VERSION); } -/* - * _verify_iotype(): helper for output/input/error arguments. - * - * will return IO_NORMAL if string matches "normal" - * - * prunes off trailing '%' char after setting IO_PER_TASK if such - * a one exists. - */ -static enum io_t _verify_iotype(char **name) -{ - enum io_t type; - char *p = *name; - int end = strlen(p) - 1; - - /* name must have form "file.%" to be IO_PER_TASK */ - if (p[end] == '%') { - type = IO_PER_TASK; - p[end] = '\0'; /* no longer need % char */ - - } else if (strncasecmp(p, "normal", (size_t) 6) != 0) { - type = IO_ALL; - } else if (strncasecmp(p, "none", (size_t) 4) != 0) { - type = IO_NONE; - } else { - type = IO_NORMAL; - } - - return type; -} /* * verify that a distribution type in arg is of a known form @@ -463,35 +429,6 @@ static long _to_bytes(const char *arg) } -/* set a few env vars for allocate mode so they'll be available in - * the resulting subshell - */ -static void _set_allocate_mode_env_vars(void) -{ - int rc; - - if (opt.output == IO_ALL) { - /* all output to single file */ - rc = setenvf("SLURM_OUTPUT=%s", opt.ofname); - } else if (opt.output == IO_PER_TASK) { - /* output is per task, need to put '%' char back */ - rc = setenvf("SLURM_OUTPUT=%s%%", opt.ofname); - } - - if (opt.error == IO_ALL) { - rc = setenvf("SLURM_ERROR=%s", opt.efname); - } else if (opt.output == IO_PER_TASK) { - rc = setenvf("SLURM_ERROR=%s%%", opt.efname); - } - - if (opt.input == IO_ALL) { - rc = setenvf("SLURM_INPUT=%s", opt.ifname); - } else if (opt.input == IO_PER_TASK) { - rc = setenvf("SLURM_INPUT=%s%%", opt.ifname); - } - -} - /* * print error message to stderr with opt.progname prepended */ @@ -548,9 +485,6 @@ static void _opt_default() opt.distribution = SRUN_DIST_UNKNOWN; - opt.output = IO_NORMAL; - opt.input = IO_NORMAL; - opt.error = IO_NORMAL; opt.ofname = NULL; opt.ifname = NULL; opt.efname = NULL; @@ -761,17 +695,14 @@ static void _opt_args(int ac, char **av) case OPT_OUTPUT: opt.ofname = strdup(arg); - opt.output = _verify_iotype(&opt.ofname); break; case OPT_INPUT: opt.ifname = strdup(arg); - opt.input = _verify_iotype(&opt.ifname); break; case OPT_ERROR: opt.efname = strdup(arg); - opt.error = _verify_iotype(&opt.efname); break; case OPT_DISTRIB: @@ -907,20 +838,9 @@ _opt_verify(poptContext optctx) } else { /* mode != MODE_ATTACH */ - if (mode == MODE_ALLOCATE) { - - /* set output/input/err (an whatever else) as - * env vars so they will be "defaults" in allocate - * subshell - */ - _set_allocate_mode_env_vars(); - - } else { - - if (remote_argc == 0) { - error("Error: must supply remote command"); - verified = false; - } + if ((remote_argc == 0) && (mode != MODE_ALLOCATE)) { + error("must supply remote command"); + verified = false; } @@ -1099,33 +1019,6 @@ _find_file_path (char *fname) #if __DEBUG -/* generate meaningful output message based on io type and "filename" */ -char *print_io_t_with_filename(enum io_t type, char *filename) -{ - char buf[256]; - - switch (type) { - case IO_ALL: - snprintf(buf, 256, "%s (file `%s')", - format_io_t(type), filename); - break; - - case IO_PER_TASK: - snprintf(buf, 256, "%s (file `%s<task_id>')", - format_io_t(type), filename); - break; - - case IO_NORMAL: - snprintf(buf, 256, "normal"); - break; - - default: - snprintf(buf, 256, "error, unknown type"); - break; - } - return strdup(buf); -} - /* helper function for printing options * * warning: returns pointer to memory allocated on the stack. @@ -1190,12 +1083,6 @@ void _opt_list() opt.partition == NULL ? "default" : opt.partition); info("job name : `%s'", opt.job_name); info("distribution : %s", format_distribution_t(opt.distribution)); - info("output : %s", - print_io_t_with_filename(opt.output, opt.ofname)); - info("error : %s", - print_io_t_with_filename(opt.error, opt.efname)); - info("input : %s", - print_io_t_with_filename(opt.input, opt.ifname)); info("core format : %s", opt.core_format); info("verbose : %d", _verbose); info("debug : %d", _debug); diff --git a/src/srun/opt.h b/src/srun/opt.h index 02fe2ece13e..92410f345e2 100644 --- a/src/srun/opt.h +++ b/src/srun/opt.h @@ -82,13 +82,13 @@ enum distribution_t { "unknown" enum io_t { - IO_NORMAL = 0, - IO_ALL = 1, - IO_PER_TASK = 2, - IO_NONE = 3, + IO_ALL = 0, /* multiplex output from all/bcast stdin to all */ + IO_ONE = 1, /* output from only one task/stdin to one task */ + IO_PER_TASK = 2, /* separate output/input file per task */ + IO_NONE = 3, /* close output/close stdin */ }; -#define format_io_t(t) (t == IO_NORMAL) ? "normal" : (t == IO_ALL) ? \ +#define format_io_t(t) (t == IO_ONE) ? "one" : (t == IO_ALL) ? \ "all" : "per task" typedef struct srun_options { @@ -111,14 +111,9 @@ typedef struct srun_options { distribution; /* --distribution=, -m dist */ char *job_name; /* --job-name=, -J name */ - enum io_t output; /* --output=, -o type */ - char *ofname; /* output filename if PER_TASK */ - - enum io_t input; /* --input=, -i type */ - char *ifname; /* input filename if PER_TASK */ - - enum io_t error; /* --error=, -e type */ - char *efname; /* stderr filename if PER_TASK */ + char *ofname; /* --output -o filename */ + char *ifname; /* --input -i filename */ + char *efname; /* --error, -e filename */ char *core_format; /* --corefile-format=, -C type */ char *attach; /* --attach=id -a id */ diff --git a/src/srun/reattach.c b/src/srun/reattach.c index 2e4d909da48..9e23bcaba67 100644 --- a/src/srun/reattach.c +++ b/src/srun/reattach.c @@ -182,10 +182,10 @@ _step_list_create(char *steplist) } -static void +static int _get_job_info(srun_step_t *s) { - int i; + int i, rc = -1; job_info_msg_t *resp = NULL; job_info_t *job = NULL; hostlist_t hl; @@ -201,31 +201,30 @@ _get_job_info(srun_step_t *s) job = &resp->job_array[i]; if (job->job_id != s->jobid) continue; - goto done; } if (job == NULL) { error ("Cannot find job %d", s->jobid); goto done; } + if (job->job_state != JOB_RUNNING) { error ("Cannot attach to %s job %d", job_state_string(job->job_state), s->jobid); goto done; } - /* For attaching to a job, we assume that it is meant - * that we are attaching to a job script, which *should* - * be running on the first node of the allocation. If it - * is not, we will get an error attempting to attach, but - * this is not too bad, since the output will be going to - * a file anyway. - */ + if (!job->batch_flag) { + rc = 0; + goto done; + } + if (!(hl = hostlist_create(job->nodes))) { error ("Unable to create hostlist from `%s'", job->nodes); goto done; } + rc = 0; s->nodes = hostlist_shift(hl); s->ntasks = 1; @@ -234,7 +233,7 @@ _get_job_info(srun_step_t *s) done: if (resp) slurm_free_job_info_msg(resp); - return; + return rc; } static void @@ -267,10 +266,20 @@ _get_step_info(srun_step_t *s) static void _get_attach_info(srun_step_t *s) { - if (s->stepid == NO_VAL) - _get_job_info(s); - else + if (s->stepid == NO_VAL) { + if (_get_job_info(s) < 0) + return; + + /* If job was not a batch job, try step 0 + */ + if (s->nodes == NULL) { + s->stepid = 0; + _get_step_info(s); + } + + } else { _get_step_info(s); + } } static int @@ -450,6 +459,8 @@ int reattach() _get_attach_info(s); + opt.ifname = "none"; + if ((opt.nodelist = s->nodes) == NULL) exit(1); @@ -482,6 +493,10 @@ int reattach() pthread_cond_wait(&job->state_cond, &job->state_mutex); slurm_mutex_unlock(&job->state_mutex); + if (job->state == SRUN_JOB_FAILED) { + error ("Attach to job failed!"); + } + pthread_kill(job->jtid, SIGTERM); pthread_join(job->ioid, NULL); diff --git a/src/srun/srun.c b/src/srun/srun.c index aebdcd5de76..bd67c400a4c 100644 --- a/src/srun/srun.c +++ b/src/srun/srun.c @@ -122,12 +122,16 @@ static int _set_batch_script_env(uint32_t jobid); static void _qsw_standalone(job_t *job); #endif +/* empty sigint handler */ +static void +_int_handler(int signal) +{ pthread_cancel(pthread_self());} + int main(int ac, char **av) { sigset_t sigset; allocation_resp *resp; - alloc_run_resp *run_resp; job_t *job; pthread_attr_t attr; struct sigaction action; @@ -258,12 +262,15 @@ main(int ac, char **av) /* kill msg server thread */ pthread_kill(job->jtid, SIGTERM); + /* wait for stdio */ + xsignal(SIGQUIT, &_int_handler); + if (pthread_join(job->ioid, NULL) < 0) { + error ("Waiting on IO: %m"); + } + /* kill signal thread */ pthread_cancel(job->sigid); - /* wait for stdio */ - pthread_join(job->ioid, NULL); - if (old_job) { debug("cancelling job step %u.%u", job->jobid, job->stepid); slurm_complete_job_step(job->jobid, job->stepid, 0, 0); @@ -486,6 +493,7 @@ _print_job_information(allocation_resp *resp) info("%s",job_details); } + static void _sigterm_handler(int signum) { @@ -502,7 +510,6 @@ _sig_thr(void *arg) job_t *job = (job_t *)arg; sigset_t set; time_t last_intr = 0; - bool suddendeath = false; int signo; while (1) { @@ -518,7 +525,8 @@ _sig_thr(void *arg) switch (signo) { case SIGINT: if ((time(NULL) - last_intr) > 1) { - info("interrupt (one more within 1 sec to abort)"); + info("interrupt (one more within 1 " + "sec to abort)"); report_task_status(job); last_intr = time(NULL); } else { /* second Ctrl-C in half as many seconds */ @@ -528,20 +536,18 @@ _sig_thr(void *arg) last_intr = time(NULL); _fwd_signal(job, signo); } else { - pthread_mutex_lock(&job->state_mutex); - if (job->state != SRUN_JOB_OVERDONE) { - info("forcing termination"); - job->state = SRUN_JOB_OVERDONE; - } else - info("attempting cleanup"); - pthread_cond_signal(&job->state_cond); - pthread_mutex_unlock(&job->state_mutex); - suddendeath = true; + info("forcing termination"); + pthread_kill(job->ioid, SIGTERM); } } break; default: - _fwd_signal(job, signo); + if (job->state != SRUN_JOB_OVERDONE) + _fwd_signal(job, signo); + else if (signo == SIGQUIT) { + info("forcing termination"); + pthread_kill(job->ioid, SIGTERM); + } break; } } @@ -726,12 +732,9 @@ _run_batch_job(void) job.env_size++; job.script = job_script; - if (opt.efname) - job.err = opt.efname; - if (opt.ifname) - job.in = opt.ifname; - if (opt.ofname) - job.out = opt.ofname; + job.err = opt.efname; + job.in = opt.ifname; + job.out = opt.ofname; job.work_dir = opt.cwd; retries = 0; -- GitLab