-
Mark A. Grondona authored
tty setup needs to occur before child tasks block waiting from signal to the parent, so move this code out of exec_task() into fork_all_tasks() so that the wait-for-signal-from-parent code can also later move out of exec_task().
Mark A. Grondona authoredtty setup needs to occur before child tasks block waiting from signal to the parent, so move this code out of exec_task() into fork_all_tasks() so that the wait-for-signal-from-parent code can also later move out of exec_task().
task.c 13.96 KiB
/*****************************************************************************\
* slurmd/slurmstepd/task.c - task launching functions for slurmstepd
*****************************************************************************
* Copyright (C) 2002-2007 The Regents of the University of California.
* Copyright (C) 2008-2009 Lawrence Livermore National Security.
* Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
* Written by Mark A. Grondona <mgrondona@llnl.gov>.
* CODE-OCEC-09-009. All rights reserved.
*
* This file is part of SLURM, a resource management program.
* For details, see <http://www.schedmd.com/slurmdocs/>.
* Please also read the included file: DISCLAIMER.
*
* SLURM is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the License, or (at your option)
* any later version.
*
* In addition, as a special exception, the copyright holders give permission
* to link the code of portions of this program with the OpenSSL library under
* certain conditions as described in each individual source file, and
* distribute linked combinations including the two. You must obey the GNU
* General Public License in all respects for all of the code used other than
* OpenSSL. If you modify file(s) with this exception, you may extend this
* exception to your version of the file(s), but you are not obligated to do
* so. If you do not wish to do so, delete this exception statement from your
* version. If you delete this exception statement from all source files in
* the program, then also delete it here.
*
* SLURM is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along
* with SLURM; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
\*****************************************************************************/
#if HAVE_CONFIG_H
# include "config.h"
#endif
#include <assert.h>
#include <ctype.h>
#include <fcntl.h>
#include <grp.h>
#include <pwd.h>
#include <string.h>
#include <sys/param.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#if HAVE_STDLIB_H
# include <stdlib.h>
#endif
#if HAVE_SYS_TYPES_H
# include <sys/types.h>
#endif
#ifdef HAVE_AIX
# include <sys/checkpnt.h>
#endif
#include <sys/resource.h>
#include "slurm/slurm_errno.h"
#include "src/common/checkpoint.h"
#include "src/common/env.h"
#include "src/common/fd.h"
#include "src/common/log.h"
#include "src/common/mpi.h"
#include "src/common/plugstack.h"
#include "src/slurmd/common/proctrack.h"
#include "src/common/switch.h"
#include "src/slurmd/common/task_plugin.h"
#include "src/common/xsignal.h"
#include "src/common/xstring.h"
#include "src/common/xmalloc.h"
#include "src/slurmd/slurmd/slurmd.h"
#include "src/slurmd/slurmstepd/io.h"
#include "src/slurmd/slurmstepd/pdebug.h"
#include "src/slurmd/slurmstepd/task.h"
#include "src/slurmd/slurmstepd/ulimits.h"
/*
* Static prototype definitions.
*/
static void _make_tmpdir(slurmd_job_t *job);
static int _run_script_and_set_env(const char *name, const char *path,
slurmd_job_t *job);
static void _proc_stdout(char *buf, char ***env);
static char *_uint32_array_to_str(int array_len, const uint32_t *array);
/*
* Process TaskProlog output
* "export NAME=value" adds environment variables
* "unset NAME" clears an environment variable
* "print <whatever>" writes that to the job's stdout
*/
static void _proc_stdout(char *buf, char ***env)
{
bool end_buf = false;
int len;
char *buf_ptr, *name_ptr, *val_ptr;
char *end_line, *equal_ptr;
buf_ptr = buf;
while (buf_ptr[0]) {
end_line = strchr(buf_ptr, '\n');
if (!end_line) {
end_line = buf_ptr + strlen(buf_ptr);
end_buf = true;
}
if (!strncmp(buf_ptr, "print ", 6)) {
buf_ptr += 6;
while (isspace(buf_ptr[0]))
buf_ptr++;
len = end_line - buf_ptr + 1;
safe_write(1, buf_ptr, len);
} else if (!strncmp(buf_ptr, "export ",7)) {
name_ptr = buf_ptr + 7;
while (isspace(name_ptr[0]))
name_ptr++;
equal_ptr = strchr(name_ptr, '=');
if (!equal_ptr || (equal_ptr > end_line))
goto rwfail;
val_ptr = equal_ptr + 1;
while (isspace(equal_ptr[-1]))
equal_ptr--;
equal_ptr[0] = '\0';
end_line[0] = '\0';
debug("export name:%s:val:%s:", name_ptr, val_ptr);
if (setenvf(env, name_ptr, "%s", val_ptr)) {
error("Unable to set %s environment variable",
buf_ptr);
}
equal_ptr[0] = '=';
if (end_buf)
end_line[0] = '\0';
else
end_line[0] = '\n';
} else if (!strncmp(buf_ptr, "unset ", 6)) {
name_ptr = buf_ptr + 6;
while (isspace(name_ptr[0]))
name_ptr++;
if ((name_ptr[0] == '\n') || (name_ptr[0] == '\0'))
goto rwfail;
while (isspace(end_line[-1]))
end_line--;
end_line[0] = '\0';
debug(" unset name:%s:", name_ptr);
unsetenvp(*env, name_ptr);
if (end_buf)
end_line[0] = '\0';
else
end_line[0] = '\n';
}
rwfail: /* process rest of script output */
if (end_buf)
break;
buf_ptr = end_line + 1;
}
return;
}
/*
* Run a task prolog script. Also read the stdout of the script and set
* environment variables in the task's environment as specified
* in the script's standard output.
* name IN: class of program ("system prolog", "user prolog", etc.)
* path IN: pathname of program to run
* job IN/OUT: pointer to associated job, can update job->env
* if prolog
* RET 0 on success, -1 on failure.
*/
static int
_run_script_and_set_env(const char *name, const char *path, slurmd_job_t *job)
{
int status, rc, nread;
pid_t cpid;
int pfd[2], offset = 0;
char buf[4096];
xassert(job->env);
if (path == NULL || path[0] == '\0')
return 0;
debug("[job %u] attempting to run %s [%s]", job->jobid, name, path);
if (access(path, R_OK | X_OK) < 0) {
error("Could not run %s [%s]: %m", name, path);
return -1;
}
if (pipe(pfd) < 0) {
error("executing %s: pipe: %m", name);
return -1;
}
if ((cpid = fork()) < 0) {
error("executing %s: fork: %m", name);
return -1;
}
if (cpid == 0) {
char *argv[2];
argv[0] = xstrdup(path);
argv[1] = NULL;
close(1);
if(dup(pfd[1]) == -1)
error("couldn't do the dup: %m");
close(2);
close(0);
close(pfd[0]);
close(pfd[1]);
#ifdef SETPGRP_TWO_ARGS
setpgrp(0, 0);
#else
setpgrp();
#endif
execve(path, argv, job->env);
error("execve(): %m");
exit(127);
}
close(pfd[1]);
buf[0] = '\0';
while ((nread = read(pfd[0], buf+offset, (sizeof(buf)-offset))) > 0)
offset += nread;
/* debug ("read %d:%s:", offset, buf); */
_proc_stdout(buf, &job->env);
close(pfd[0]);
while (1) {
rc = waitpid(cpid, &status, 0);
if (rc < 0) {
if (errno == EINTR)
continue;
error("waidpid: %m");
return 0;
} else {
killpg(cpid, SIGKILL); /* kill children too */
return status;
}
}
/* NOTREACHED */
}
/* Given a program name, translate it to a fully qualified pathname
* as needed based upon the PATH environment variable */
static char *
_build_path(char* fname, char **prog_env)
{
int i;
char *path_env = NULL, *dir;
char *file_name, *file_path;
struct stat stat_buf;
int len = 256;
file_name = (char *)xmalloc(len);
/* make copy of file name (end at white space) */
snprintf(file_name, len, "%s", fname);
for (i=0; i < len; i++) {
if (file_name[i] == '\0')
break;
if (!isspace(file_name[i]))
continue;
file_name[i] = '\0';
break;
}
/* check if already absolute path */
if (file_name[0] == '/')
return file_name;
/* search for the file using PATH environment variable */
for (i=0; ; i++) {
if (prog_env[i] == NULL)
return file_name;
if (strncmp(prog_env[i], "PATH=", 5))
continue;
path_env = xstrdup(&prog_env[i][5]);
break;
}
file_path = (char *)xmalloc(len);
dir = strtok(path_env, ":");
while (dir) {
snprintf(file_path, len, "%s/%s", dir, file_name);
if (stat(file_path, &stat_buf) == 0)
break;
dir = strtok(NULL, ":");
}
if (dir == NULL) /* not found */
snprintf(file_path, len, "%s", file_name);
xfree(file_name);
xfree(path_env);
return file_path;
}
static int
_setup_mpi(slurmd_job_t *job, int ltaskid)
{
mpi_plugin_task_info_t info[1];
info->jobid = job->jobid;
info->stepid = job->stepid;
info->nnodes = job->nnodes;
info->nodeid = job->nodeid;
info->ntasks = job->ntasks;
info->ltasks = job->node_tasks;
info->gtaskid = job->task[ltaskid]->gtid;
info->ltaskid = job->task[ltaskid]->id;
info->self = job->envtp->self;
info->client = job->envtp->cli;
return mpi_hook_slurmstepd_task(info, &job->env);
}
/*
* Current process is running as the user when this is called.
*/
void
exec_task(slurmd_job_t *job, int i, int waitfd)
{
char c;
uint32_t *gtids; /* pointer to arrary of ranks */
int fd, j;
int rc;
slurmd_task_info_t *task = job->task[i];
if (i == 0)
_make_tmpdir(job);
/*
* Stall exec until all tasks have joined the same process group
*/
if ((rc = read (waitfd, &c, sizeof (c))) != 1) {
error ("_exec_task read failed, fd = %d, rc=%d: %m", waitfd, rc);
log_fini();
exit(1);
}
close(waitfd);
gtids = xmalloc(job->node_tasks * sizeof(uint32_t));
for (j = 0; j < job->node_tasks; j++)
gtids[j] = job->task[j]->gtid;
job->envtp->sgtids = _uint32_array_to_str(job->node_tasks, gtids);
xfree(gtids);
job->envtp->jobid = job->jobid;
job->envtp->stepid = job->stepid;
job->envtp->nodeid = job->nodeid;
job->envtp->cpus_on_node = job->cpus;
job->envtp->env = job->env;
job->envtp->procid = task->gtid;
job->envtp->localid = task->id;
job->envtp->task_pid = getpid();
job->envtp->distribution = job->task_dist;
job->envtp->cpu_bind = xstrdup(job->cpu_bind);
job->envtp->cpu_bind_type = job->cpu_bind_type;
job->envtp->mem_bind = xstrdup(job->mem_bind);
job->envtp->mem_bind_type = job->mem_bind_type;
job->envtp->distribution = -1;
job->envtp->ckpt_dir = xstrdup(job->ckpt_dir);
job->envtp->batch_flag = job->batch;
setup_env(job->envtp, false);
setenvf(&job->envtp->env, "SLURMD_NODENAME", "%s", conf->node_name);
job->env = job->envtp->env;
job->envtp->env = NULL;
xfree(job->envtp->task_count);
if (task->argv[0] && *task->argv[0] != '/') {
/*
* Normally the client (srun) expands the command name
* to a fully qualified path, but in --multi-prog mode it
* is left up to the server to search the PATH for the
* executable.
*/
task->argv[0] = _build_path(task->argv[0], job->env);
}
if (!job->batch) {
if (interconnect_attach(job->switch_job, &job->env,
job->nodeid, (uint32_t) i, job->nnodes,
job->ntasks, task->gtid) < 0) {
error("Unable to attach to interconnect: %m");
log_fini();
exit(1);
}
if (_setup_mpi(job, i) != SLURM_SUCCESS) {
error("Unable to configure MPI plugin: %m");
log_fini();
exit(1);
}
}
io_dup_stdio(task);
/* task-specific pre-launch activities */
if (spank_user_task (job, i) < 0) {
error ("Failed to invoke task plugin stack");
exit (1);
}
/* task plugin hook */
if (pre_launch(job)) {
error ("Failed task affinity setup");
exit (1);
}
if (conf->task_prolog) {
char *my_prolog;
slurm_mutex_lock(&conf->config_mutex);
my_prolog = xstrdup(conf->task_prolog);
slurm_mutex_unlock(&conf->config_mutex);
_run_script_and_set_env("slurm task_prolog",
my_prolog, job);
xfree(my_prolog);
}
if (job->task_prolog) {
_run_script_and_set_env("user task_prolog",
job->task_prolog, job);
}
if (!job->batch)
pdebug_stop_current(job);
if (job->env == NULL) {
debug("job->env is NULL");
job->env = (char **)xmalloc(sizeof(char *));
job->env[0] = (char *)NULL;
}
if (job->restart_dir) {
info("restart from %s", job->restart_dir);
/* no return on success */
checkpoint_restart_task(job, job->restart_dir, task->gtid);
error("Restart task failed: %m");
exit(errno);
}
if (task->argv[0] == NULL) {
error("No executable program specified for this task");
exit(2);
}
/* Do this last so you don't worry too much about the users
limits including the slurmstepd in with it.
*/
if (set_user_limits(job) < 0) {
debug("Unable to set user limits");
log_fini();
exit(5);
}
execve(task->argv[0], task->argv, job->env);
/*
* print error message and clean up if execve() returns:
*/
if ((errno == ENOENT) &&
((fd = open(task->argv[0], O_RDONLY)) >= 0)) {
char buf[256], *eol;
int sz;
sz = read(fd, buf, sizeof(buf));
if ((sz >= 3) && (strncmp(buf, "#!", 2) == 0)) {
eol = strchr(buf, '\n');
if (eol)
eol[0] = '\0';
else
buf[sizeof(buf)-1] = '\0';
error("execve(): bad interpreter(%s): %m", buf+2);
exit(errno);
}
}
error("execve(): %s: %m", task->argv[0]);
exit(errno);
}
static void
_make_tmpdir(slurmd_job_t *job)
{
char *tmpdir;
if (!(tmpdir = getenvp(job->env, "TMPDIR")))
setenvf(&job->env, "TMPDIR", "/tmp"); /* task may want it set */
else if (mkdir(tmpdir, 0700) < 0) {
if (errno == EEXIST) {
struct stat st;
if (stat(tmpdir, &st) == 0 && /* does user have access? */
S_ISDIR(st.st_mode) && /* is it a directory? */
((st.st_mode & S_IWOTH) || /* can user write there? */
(st.st_uid == job->uid && (st.st_mode & S_IWUSR))))
return;
}
error("Unable to create TMPDIR [%s]: %m", tmpdir);
error("Setting TMPDIR to /tmp");
setenvf(&job->env, "TMPDIR", "/tmp");
}
return;
}
/*
* Return a string representation of an array of uint32_t elements.
* Each value in the array is printed in decimal notation and elements
* are separated by a comma.
*
* Returns an xmalloc'ed string. Free with xfree().
*/
static char *_uint32_array_to_str(int array_len, const uint32_t *array)
{
int i;
char *sep = ","; /* seperator */
char *str = xstrdup("");
if(array == NULL)
return str;
for (i = 0; i < array_len; i++) {
if (i == array_len-1) /* last time through loop */
sep = "";
xstrfmtcat(str, "%u%s", array[i], sep);
}
return str;
}