From c9d7896602e9641d433eff819ff36827464dbe4e Mon Sep 17 00:00:00 2001 From: Mark Grondona <mgrondona@llnl.gov> Date: Wed, 26 Feb 2003 18:30:58 +0000 Subject: [PATCH] o Reorganize slurmd job management functionality so that job/job step session leader runs as owner of job and not root. Split "session manger" code from mgr.c (which runs as root) into smgr.c (most session manager code runs as user) o reorganize interconnect functions (see interconnect.h) to fit new dual-process model for slurmd. o add pipe to job structure for communication between slurmd job manager and session manager. o add "mpid" field to shared memory for job steps and shm_update_step_mpid() to update this information. o add ulimits.[ch] to support setting user limits for jobs based off SLURM_RLIMIT* env vars. --- src/slurmd/Makefile.am | 2 + src/slurmd/elan_interconnect.c | 46 +- src/slurmd/interconnect.h | 55 +- src/slurmd/io.c | 10 +- src/slurmd/job.c | 10 + src/slurmd/job.h | 55 +- src/slurmd/mgr.c | 1065 +++++++++++++++++--------------- src/slurmd/no_interconnect.c | 14 +- src/slurmd/req.c | 6 +- src/slurmd/shm.c | 37 +- src/slurmd/shm.h | 32 +- src/slurmd/slurmd.c | 4 +- src/slurmd/smgr.c | 410 ++++++++++++ src/slurmd/smgr.h | 64 ++ src/slurmd/ulimits.c | 139 +++++ src/slurmd/ulimits.h | 38 ++ 16 files changed, 1425 insertions(+), 562 deletions(-) create mode 100644 src/slurmd/smgr.c create mode 100644 src/slurmd/smgr.h create mode 100644 src/slurmd/ulimits.c create mode 100644 src/slurmd/ulimits.h diff --git a/src/slurmd/Makefile.am b/src/slurmd/Makefile.am index 7fc58030902..84514f662dc 100644 --- a/src/slurmd/Makefile.am +++ b/src/slurmd/Makefile.am @@ -27,6 +27,7 @@ common_sources = \ slurmd.c slurmd.h \ req.c req.h \ mgr.c mgr.h \ + smgr.c smgr.h \ get_mach_stat.c \ get_mach_stat.h \ read_proc.c \ @@ -35,6 +36,7 @@ common_sources = \ semaphore.c semaphore.h \ shm.c shm.h \ fname.c fname.h \ + ulimits.c ulimits.h \ setenvpf.c setenvpf.h \ interconnect.h diff --git a/src/slurmd/elan_interconnect.c b/src/slurmd/elan_interconnect.c index 25f1211735f..5c0bdeaa5f2 100644 --- a/src/slurmd/elan_interconnect.c +++ b/src/slurmd/elan_interconnect.c @@ -46,19 +46,12 @@ #include "src/slurmd/shm.h" static int -_wait_and_destroy_prg(qsw_jobinfo_t qsw_job, pid_t pid) +_wait_and_destroy_prg(qsw_jobinfo_t qsw_job) { int i = 0; int sleeptime = 1; - debug3("waiting to destory program description..."); - again: - if (waitpid(pid, NULL, 0) < 0) { - if (errno == EINTR) - goto again; - error("waitpid: %m"); - exit(1); - } + debug3("going to destory program description..."); while(qsw_prgdestroy(qsw_job) < 0) { i++; @@ -78,8 +71,12 @@ _wait_and_destroy_prg(qsw_jobinfo_t qsw_job, pid_t pid) } debug("destroyed program description"); + return SLURM_SUCCESS; +} - exit(0); +int +interconnect_preinit(slurmd_job_t *job) +{ return SLURM_SUCCESS; } @@ -89,27 +86,11 @@ _wait_and_destroy_prg(qsw_jobinfo_t qsw_job, pid_t pid) int interconnect_init(slurmd_job_t *job) { - pid_t pid; - - /* Process 1: */ - switch ((pid = fork())) - { - case -1: - error ("elan_interconnect_prepare fork(): %m"); - return SLURM_ERROR ; - case 0: /* child falls thru */ - break; - default: /* parent */ - _wait_and_destroy_prg(job->qsw_job, pid); - /*NOTREACHED*/ - } - - /* Process 2: */ - debug("calling qsw_prog_init from process %ld", getpid()); + debug2("calling interconnect_init from process %ld", getpid()); if (qsw_prog_init(job->qsw_job, job->uid) < 0) { error ("elan interconnect_init: qsw_prog_init: %m"); /* we may lose the following info if not logging to stderr */ - qsw_print_jobinfo(stderr, job->qsw_job); + qsw_print_jobinfo(log_fp(), job->qsw_job); return SLURM_ERROR; } @@ -119,8 +100,17 @@ interconnect_init(slurmd_job_t *job) int interconnect_fini(slurmd_job_t *job) { + qsw_prog_fini(job->qsw_job); + return SLURM_SUCCESS; +} + +int +interconnect_postfini(slurmd_job_t *job) +{ + _wait_and_destroy_prg(job->qsw_job); return SLURM_SUCCESS; } + int interconnect_attach(slurmd_job_t *job, int procid) { diff --git a/src/slurmd/interconnect.h b/src/slurmd/interconnect.h index e9984cd8b48..c156fc7562a 100644 --- a/src/slurmd/interconnect.h +++ b/src/slurmd/interconnect.h @@ -32,23 +32,74 @@ #include "src/common/slurm_protocol_api.h" #include "src/slurmd/job.h" +/* + * Notes: + * + * Interconnect functions are run within slurmd in the following way: + * (Diagram courtesy of Jim Garlick [see qsw.c] ) + * + * Process 1 (root) Process 2 (root, user) | Process 3 (user task) + * | + * interconnect_preinit | + * fork ------------------ interconnect_init | + * waitpid setuid, chdir, etc. | + * fork N procs -----------+--- interconnect_attach + * wait all | interconnect_env + * | exec mpi process + * interconnect_fini* | + * interconnect_postfini | + * | + * + * [ *Note: interconnect_fini() is run as the uid of the job owner, not root ] + */ + + +/* + * Prepare node for job. + * + * pre is run as root in the first slurmd process, the so called job + * manager. This function can be used to perform any initialization + * that needs to be performed in the same process as interconnect_fini() + * + */ +int interconnect_preinit(slurmd_job_t *job); + /* - * initialize interconnect on node + * initialize interconnect on node. This function is run from the + * 2nd slurmd process (some interconnect implementations may require + * interconnect init functions to be executed from a separate process + * than the process executing initerconnect_fini() [e.g. QsNet]) + * */ int interconnect_init(slurmd_job_t *job); /* - * finalize and detach from interconnect on node + * This function is run from the same process as interconnect_init() + * after all job tasks have exited. It is *not* run as root, because + * the process in question has already setuid to the job owner. + * */ int interconnect_fini(slurmd_job_t *job); +/* + * Finalize interconnect on node. + * + * This function is run from the initial slurmd process (same process + * as interconnect_preinit()), and is run as root. Any cleanup routines + * that need to be run with root privileges should be run from this + * function. + */ +int interconnect_postfini(slurmd_job_t *job); + /* * attach process to interconnect + * */ int interconnect_attach(slurmd_job_t *job, int taskid); /* * Set environment variables needed. + * */ int interconnect_env(slurmd_job_t *job, int taskid); diff --git a/src/slurmd/io.c b/src/slurmd/io.c index 27511531d11..5d1ed247d45 100644 --- a/src/slurmd/io.c +++ b/src/slurmd/io.c @@ -445,6 +445,8 @@ _io_add_connecting(slurmd_job_t *job, task_info_t *t, srun_info_t *srun, io_obj_t *obj = NULL; int sock = -1; + debug3("in io_add_connecting"); + if ((sock = (int) slurm_open_stream(&srun->ioaddr)) < 0) { error("connect io: %m"); /* XXX retry or silently fail? @@ -470,6 +472,8 @@ _io_add_connecting(slurmd_job_t *job, task_info_t *t, srun_info_t *srun, list_append(job->objs, (void *)obj); + debug3("Now handling %d IO objects", list_count(job->objs)); + return SLURM_SUCCESS; } @@ -501,7 +505,7 @@ _io_prepare_one(slurmd_job_t *j, task_info_t *t, srun_info_t *s) } if (!list_find_first(t->srun_list, (ListFindF) find_obj, s)) { - debug("appending new client to srun_list for task %d", t->gid); + debug3("appending new client to srun_list for task %d", t->gid); list_append(t->srun_list, (void *) s); } @@ -543,7 +547,9 @@ io_prepare_clients(slurmd_job_t *job) return SLURM_FAILURE; /* kick IO thread */ - pthread_kill(job->ioid, SIGHUP); + debug3("sending sighup to io thread id %ld", job->ioid); + if (pthread_kill(job->ioid, SIGHUP) < 0) + error("pthread_kill: %m"); } return SLURM_SUCCESS; diff --git a/src/slurmd/job.c b/src/slurmd/job.c index dcfa6b9b2d6..7209cca8ab7 100644 --- a/src/slurmd/job.c +++ b/src/slurmd/job.c @@ -153,6 +153,11 @@ job_create(launch_tasks_request_msg_t *msg, slurm_addr *cli_addr) _job_init_task_info(job, msg->global_task_ids); + if (pipe(job->fdpair) < 0) { + error("pipe: %m"); + return NULL; + } + return job; } @@ -209,6 +214,11 @@ job_batch_job_create(batch_job_launch_msg_t *msg) */ job->argv = (char **) xmalloc(job->argc * sizeof(char *)); + if (pipe(job->fdpair) < 0) { + error("pipe: %m"); + return NULL; + } + _job_init_task_info(job, &global_taskid); return job; diff --git a/src/slurmd/job.h b/src/slurmd/job.h index 7cbb03aa219..ba3d9614a50 100644 --- a/src/slurmd/job.h +++ b/src/slurmd/job.h @@ -84,31 +84,40 @@ typedef struct srun_info { } srun_info_t; typedef struct slurmd_job { - uint32_t jobid; - uint32_t stepid; - uint32_t nnodes; - uint32_t nprocs; - uint32_t nodeid; - uint32_t ntasks; - uint32_t debug; - uint16_t envc; - uint16_t argc; - bool batch; - bool run_prolog; /* need to run prolog */ - char **env; - char **argv; - char *cwd; + uint32_t jobid; /* Current SLURM job id */ + uint32_t stepid; /* Current step id (or NO_VAL) */ + uint32_t nnodes; /* number of nodes in current job */ + uint32_t nprocs; /* total number of processes in current job */ + uint32_t nodeid; /* relative position of this node in job */ + uint32_t ntasks; /* number of tasks on *this* node */ + uint32_t debug; /* debug level for job slurmd */ + uint16_t envc; /* Environment variable count */ + uint16_t argc; /* number of commandline arguments */ + char **env; /* job environment */ + char **argv; /* job argument vector */ + char *cwd; /* path to current working directory */ #ifdef HAVE_LIBELAN3 - qsw_jobinfo_t qsw_job; + qsw_jobinfo_t qsw_job; /* Elan-specific job information */ #endif - uid_t uid; - struct passwd *pwd; - time_t timelimit; - task_info_t **task; - List objs; - List sruns; - pthread_t ioid; - uint16_t task_flags; + uid_t uid; /* user id for job */ + + bool batch; /* true if this is a batch job */ + bool run_prolog; /* true if need to run prolog */ + time_t timelimit; /* time at which job must stop */ + + struct passwd *pwd; /* saved passwd struct for user job */ + task_info_t **task; /* list of task information pointers */ + List objs; /* list of IO objects */ + List sruns; /* List of sruns */ + pthread_t ioid; /* pthread id of IO thread */ + + pid_t jmgr_pid; /* job manager pid */ + pid_t smgr_pid; /* session manager pid */ + + int fdpair[2]; /* file descriptor pair for */ + /* communication between slurmds */ + + uint16_t task_flags; } slurmd_job_t; diff --git a/src/slurmd/mgr.c b/src/slurmd/mgr.c index 84e60fc5cc4..a412cbad52d 100644 --- a/src/slurmd/mgr.c +++ b/src/slurmd/mgr.c @@ -57,90 +57,577 @@ #include "src/common/xstring.h" #include "src/common/xmalloc.h" +#include "src/slurmd/mgr.h" + #include "src/slurmd/slurmd.h" #include "src/slurmd/setenvpf.h" -#include "src/slurmd/mgr.h" +#include "src/slurmd/smgr.h" #include "src/slurmd/io.h" #include "src/slurmd/shm.h" #include "src/slurmd/interconnect.h" -static int _run_job(slurmd_job_t *job); -static int _exec_all_tasks(slurmd_job_t *job); -static void _task_exec(slurmd_job_t *job, int i); + +/* + * Map session manager exit status to slurm errno: + * Keep in sync with smgr.c exit codes. + */ +static int exit_errno[] = +{ 0, + ESLURM_INTERCONNECT_FAILURE, + ESLURMD_SET_UID_OR_GID_ERROR, + ESLURMD_SET_SID_ERROR, + ESCRIPT_CHDIR_FAILED, + -1, + ESLURMD_EXECVE_FAILED +}; + +#define MAX_SMGR_EXIT_STATUS 6 + + + +/* + * Prototypes + */ + +/* + * Job manager related prototypes + */ +static int _job_mgr(slurmd_job_t *job); +static int _setup_io(slurmd_job_t *job); static int _drop_privileges(struct passwd *pwd); static int _reclaim_privileges(struct passwd *pwd); -static int _become_user(slurmd_job_t *job); -static int _unblock_all_signals(void); static int _block_most_signals(void); -static int _send_exit_msg(int rc, task_info_t *t); -static int _complete_job(slurmd_job_t *job, int rc, int status); static void _send_launch_resp(slurmd_job_t *job, int rc); -static void _wait_for_all_tasks(slurmd_job_t *job); static void _slurmd_job_log_init(slurmd_job_t *job); +static int _update_shm_task_info(slurmd_job_t *job); +static int _readn(int fd, void *buf, size_t nbytes); +static int _create_job_session(slurmd_job_t *job); +static int _wait_for_task_exit(slurmd_job_t *job); +static int _wait_for_session(slurmd_job_t *job); +static void _wait_for_io(slurmd_job_t *job); +static void _handle_attach_req(slurmd_job_t *job); +static int _send_exit_msg(slurmd_job_t *job, int tid[], int n, int status); + +static void _setargs(slurmd_job_t *job, char **argv, int argc); + +/* + * Batch job mangement prototypes: + */ +static char * _make_batch_dir(slurmd_job_t *job); +static char * _make_batch_script(batch_job_launch_msg_t *msg, char *path); +static int _setup_batch_env(slurmd_job_t *job, char *nodes); +static int _complete_job(slurmd_job_t *job, int err, int status); -static void -_setargs(slurmd_job_t *job, char **argv, int argc) + +/* SIGHUP (empty) signal handler + */ +static void _hup_handler(int sig) {;} + +/* + * Launch an job step on the current node + */ +int +mgr_launch_tasks(launch_tasks_request_msg_t *msg, slurm_addr *cli) { - int i; - size_t len = 0; - char *arg = NULL; + slurmd_job_t *job = NULL; + char buf[256]; - for (i = 0; i < argc; i++) - len += strlen(argv[i]) + 1; + snprintf(buf, sizeof(buf), "[%d.%d]", msg->job_id, msg->job_step_id); + log_set_fpfx(buf); - if (job->stepid == NO_VAL) - xstrfmtcat(arg, "[%d]", job->jobid); - else - xstrfmtcat(arg, "[%d.%d]", job->jobid, job->stepid); + if (!(job = job_create(msg, cli))) + return SLURM_ERROR; - if (len < (strlen(arg) + 7)) - goto done; + _setargs(job, *conf->argv, *conf->argc); - memset(argv[0], 0, len); - strncpy(argv[0], "slurmd", 6); - strncpy((*argv)+7, arg, strlen(arg)); + if (_job_mgr(job) < 0) + return SLURM_ERROR; - done: - xfree(arg); - return; + return SLURM_SUCCESS; } -/* Launch a job step on this node +/* + * Launch a batch job script on the current node */ int -mgr_launch_tasks(launch_tasks_request_msg_t *msg, slurm_addr *cli) +mgr_launch_batch_job(batch_job_launch_msg_t *msg, slurm_addr *cli) { + int rc = 0; + int status = 0; slurmd_job_t *job; - char buf[256]; + char *batchdir; + char buf[256]; - snprintf(buf, sizeof(buf), "[%d.%d]", msg->job_id, msg->job_step_id); + snprintf(buf, sizeof(buf), "[%d]", msg->job_id); log_set_fpfx(buf); - /* New process, so we must reinit shm */ - if (shm_init() < 0) - goto error; - - if (!(job = job_create(msg, cli))) - goto error; + if (!(job = job_batch_job_create(msg))) + goto cleanup; _setargs(job, *conf->argv, *conf->argc); - verbose("running job step %d.%d for %s", - job->jobid, job->stepid, job->pwd->pw_name); + if ((batchdir = _make_batch_dir(job)) == NULL) + goto cleanup1; + + xfree(job->argv[0]); + + if ((job->argv[0] = _make_batch_script(msg, batchdir)) == NULL) + goto cleanup2; + + if ((rc = _setup_batch_env(job, msg->nodes)) < 0) + goto cleanup2; + + status = _job_mgr(job); + + cleanup2: + if (job->argv[0] && (unlink(job->argv[0]) < 0)) + error("unlink(%s): %m", job->argv[0]); + cleanup1: + if (batchdir && (rmdir(batchdir) < 0)) + error("rmdir(%s): %m", batchdir); + xfree(batchdir); + cleanup : + verbose("job %d completed with slurm_rc = %d, job_rc = %d", + job->jobid, rc, status); + _complete_job(job, rc, status); + return 0; +} + + + +/* + * Run a prolog or epilog script. + * returns -1 on failure. + * + */ +int +run_script(bool prolog, const char *path, uint32_t jobid, uid_t uid) +{ + int status; + pid_t cpid; + char *name = prolog ? "prolog" : "epilog"; + + if (path == NULL || path[0] == '\0') + return 0; + + debug("[job %d] attempting to run %s [%s]", jobid, name, path); + + if (access(path, R_OK | X_OK) < 0) { + debug("Not running %s [%s]: %m", name, path); + return 0; + } + + if ((cpid = fork()) < 0) { + error ("executing %s: fork: %m", name); + return -1; + } + if (cpid == 0) { + char *argv[4]; + char **env; + int envc = 0; + - /* Run job's tasks and wait for all tasks to exit. + env = xmalloc(sizeof(char *)); + + argv[0] = xstrdup(path); + argv[1] = NULL; + + env[0] = NULL; + setenvpf(&env, &envc, "SLURM_JOBID=%u", jobid); + setenvpf(&env, &envc, "SLURM_UID=%u", uid); + + execve(path, argv, env); + error("help! %m"); + exit(127); + } + + do { + if (waitpid(cpid, &status, 0) < 0) { + if (errno != EINTR) + return -1; + } else + return status; + } while(1); + + /* NOTREACHED */ +} + + +static int +_setup_io(slurmd_job_t *job) +{ + int rc = 0; + struct passwd *spwd = NULL; + + /* + * Save current UID/GID */ - if (_run_job(job) < 0) - goto error; + if (!(spwd = getpwuid(geteuid()))) { + error("getpwuid: %m"); + return ESLURMD_IO_ERROR; + } - debug2("%ld returned from slurmd_run_job()", getpid()); - shm_fini(); - return(SLURM_SUCCESS); - error: + if (io_spawn_handler(job) < 0) + return ESLURMD_IO_ERROR; + + /* + * Initialize log facility to copy errors back to srun + */ + _slurmd_job_log_init(job); + + /* + * Temporarily drop permissions, initialize IO clients + * (open files/connections for IO, etc), then reclaim privileges. + */ + if (_drop_privileges(job->pwd) < 0) + return ESLURMD_SET_UID_OR_GID_ERROR; + + rc = io_prepare_clients(job); + + if (_reclaim_privileges(spwd) < 0) + error("sete{u/g}id(%ld/%ld): %m", spwd->pw_uid, spwd->pw_gid); + + if (rc < 0) + return ESLURMD_IO_ERROR; + + return SLURM_SUCCESS; +} + + +/* + * Send task exit message for n tasks. tid is the list of _local_ + * task ids that have exited + */ +static int +_send_exit_msg(slurmd_job_t *job, int tid[], int n, int status) +{ + int j; + slurm_msg_t resp; + task_exit_msg_t msg; + uint32_t gid[n]; + ListIterator i = NULL; + srun_info_t *srun = NULL; + + debug3("sending task exit msg for %d tasks", n); + + for (j = 0; j < n; j++) + gid[j] = job->task[tid[j]]->gid; + + msg.task_id_list = gid; + msg.num_tasks = n; + msg.return_code = status; + resp.data = &msg; + resp.msg_type = MESSAGE_TASK_EXIT; + + /* + * XXX: Should srun_list be associated with each task? + */ + i = list_iterator_create(job->task[tid[0]]->srun_list); + while ((srun = list_next(i))) { + resp.address = srun->resp_addr; + if (resp.address.sin_family != 0) + slurm_send_only_node_msg(&resp); + } + list_iterator_destroy(i); + + return SLURM_SUCCESS; +} + + +/* + * Executes the functions of the slurmd job manager process, + * which runs as root and performs shared memory and interconnect + * initialization, etc. + * + * Returns 0 if job ran and completed successfully. + * Returns errno if job startup failed. + * + */ +static int +_job_mgr(slurmd_job_t *job) +{ + int rc = 0; + + debug3("Entered job_mgr"); + + if (shm_init() < 0) + goto fail0; + + job_update_shm(job); + + if (!job->batch && (interconnect_preinit(job) < 0)) { + rc = ESLURM_INTERCONNECT_FAILURE; + goto fail1; + } + + _block_most_signals(); + + if ((rc = _setup_io(job))) + goto fail1; + + xsignal(SIGHUP, _hup_handler); + + /* + * Create slurmd session manager and read task pids from pipe + */ + if ((rc = _create_job_session(job))) { + /* + * Get exit code from session manager + */ + if (rc < 0) + rc = _wait_for_session(job); + goto fail2; + } + + /* + * Send job launch response with list of pids + */ + if (!job->batch) + _send_launch_resp(job, 0); + + /* + * Wait for all tasks to exit + */ + _wait_for_task_exit(job); + + /* wait for session to terminate, + * then clean up + */ + _wait_for_session(job); + + fail2: + /* + * Wait for io thread to complete + */ + _wait_for_io(job); + + if (!job->batch && (interconnect_postfini(job) < 0)) + error("interconnect_postfini: %m"); + fail1: + job_delete_shm(job); shm_fini(); - return(SLURM_ERROR); + fail0: + /* If interactive job startup was abnormal, + * be sure to notify client. + */ + if ((rc != 0) && !job->batch) + _send_launch_resp(job, rc); + + return(rc); +} + +/* + * update task information from "job" into shared memory + */ +static int +_update_shm_task_info(slurmd_job_t *job) +{ + int retval = SLURM_SUCCESS; + int i; + + for (i = 0; i < job->ntasks; i++) { + task_t t; + + t.id = i; + t.global_id = job->task[i]->gid; + t.pid = job->task[i]->pid; + t.ppid = job->smgr_pid; + + if (shm_add_task(job->jobid, job->stepid, &t) < 0) { + error("shm_add_task: %m"); + retval = SLURM_ERROR; + } + } + + return retval; +} + +static int +_readn(int fd, void *buf, size_t nbytes) +{ + int n = 0; + char *pbuf = (char *) buf; + size_t nleft = nbytes; + + while (nleft > 0) { + if ((n = read(fd, (void *) pbuf, nleft)) > 0) { + pbuf+=n; + nleft-=n; + } else if (n == 0) /* EOF */ + break; + else if (errno == EINTR) + break; + else { + debug("read: %m"); + break; + } + } + return(n); +} + + +static int +_create_job_session(slurmd_job_t *job) +{ + int i; + int rc = 0; + int fd = job->fdpair[0]; + pid_t spid; + + if ((spid = smgr_create(job)) < (pid_t) 0) { + error("Unable to create session manager: %m"); + return ESLURMD_FORK_FAILED; + } + + job->jmgr_pid = getpid(); + if (shm_update_step_mpid(job->jobid, job->stepid, getpid()) < 0) + error("shm_update_step_mpid: %m"); + + job->smgr_pid = spid; + if (shm_update_step_sid(job->jobid, job->stepid, spid) < 0) + error("shm_update_step_sid: %m"); + + /* + * Read information from session manager slurmd + */ + for (i = 0; i < job->ntasks; i++) { + pid_t *pidptr = &job->task[i]->pid; + + if ((rc = _readn(fd, (void *) pidptr, sizeof(pid_t))) < 0) + error("Error obtaining task information: %m"); + + if (rc == 0) /* EOF, smgr must've died */ + goto error; + } + + _update_shm_task_info(job); + + return SLURM_SUCCESS; + + error: + rc = _wait_for_session(job); + return rc; +} + +static int +_handle_task_exit(slurmd_job_t *job) +{ + int len; + int tid[1]; + exit_status_t e; + + if ((len = _readn(job->fdpair[0], &e, sizeof(e))) < 0) { + error("read from session mgr: %m"); + return SLURM_ERROR; + } + + if (len == 0) /* EOF */ + return len; + + tid[0] = e.taskid; + + debug2("global task %d exited with status %d", tid[0], e.status); + + _send_exit_msg(job, tid, 1, e.status); + + return SLURM_SUCCESS; +} + +/* + * Wait for tasks to exit by reading task exit codes from slurmd + * session manager pipe. On EOF or when waiting == 0, the job is + * complete + */ +static int +_wait_for_task_exit(slurmd_job_t *job) +{ + int rc = 0; + int waiting = job->ntasks; + struct pollfd pfd[1]; + + pfd[0].fd = job->fdpair[0]; + pfd[0].events = POLLIN; + + while (waiting > 0) { + int revents; + + if ((rc = poll(pfd, 1, -1)) < 0) { + if (errno == EINTR) { + _handle_attach_req(job); + continue; + } + } + + revents = pfd[0].revents; + + if (revents & POLLNVAL) + return SLURM_ERROR; + + if ( (revents & POLLERR) + || (revents & POLLHUP) ) { + /* + * smgr exited. XXX: Needs work + */ + while (waiting && (_handle_task_exit(job) == 0)) { + waiting--; + } + if (waiting != 0) + return SLURM_ERROR; + else + return SLURM_SUCCESS; + } + + if ((revents & POLLIN) + && (_handle_task_exit(job) == SLURM_SUCCESS)) + waiting--; + } + + return SLURM_SUCCESS; +} + + +/* + * read task exit status from slurmd session manager process, + * then wait for session manager to terminate + */ +static int +_wait_for_session(slurmd_job_t *job) +{ + int status = -1; + pid_t pid; + + while ((pid = waitpid(job->smgr_pid, &status, 0)) < (pid_t) 0) { + if (errno == EINTR) + _handle_attach_req(job); + else { + error("waitpid: %m"); + break; + } + } + + status = WEXITSTATUS(status); + + return (status < MAX_SMGR_EXIT_STATUS) ? exit_errno[status] : status; } +/* + * Wait for IO + */ +static void +_wait_for_io(slurmd_job_t *job) +{ + debug("Waiting for IO"); + io_close_all(job); + + /* + * Wait until IO thread exits + */ + pthread_join(job->ioid, NULL); + + return; +} + + static char * _make_batch_dir(slurmd_job_t *job) { @@ -235,149 +722,35 @@ _setup_batch_env(slurmd_job_t *job, char *nodes) } -int -mgr_launch_batch_job(batch_job_launch_msg_t *msg, slurm_addr *cli) -{ - int rc = 0; - int status = 0; - slurmd_job_t *job; - char *batchdir; - char buf[256]; - - snprintf(buf, sizeof(buf), "[%d]", msg->job_id); - log_set_fpfx(buf); - - /* New process, so must reinit shm */ - if ((rc = shm_init()) < 0) - goto cleanup1; - - if (!(job = job_batch_job_create(msg))) - goto cleanup2; - - /* - * This is now done in _run_job() - */ - /* job_update_shm(job); */ - - _setargs(job, *conf->argv, *conf->argc); - - if ((batchdir = _make_batch_dir(job)) == NULL) - goto cleanup2; - - xfree(job->argv[0]); - - if ((job->argv[0] = _make_batch_script(msg, batchdir)) == NULL) - goto cleanup3; - - if ((rc = _setup_batch_env(job, msg->nodes)) < 0) - goto cleanup; - - status = _run_job(job); - - cleanup: - if (job->argv[0] && (unlink(job->argv[0]) < 0)) - error("unlink(%s): %m", job->argv[0]); - cleanup3: - if (batchdir && (rmdir(batchdir) < 0)) - error("rmdir(%s): %m", batchdir); - xfree(batchdir); - cleanup2: - shm_delete_step(job->jobid, job->stepid); - shm_fini(); - cleanup1: - verbose("job %d completed with slurm_rc = %d, job_rc = %d", - job->jobid, rc, status); - _complete_job(job, rc, status); - return 0; -} - -/* Instance of a slurmd "job" or job step: - * We run: - * interconnect_prepare() : prepare node for interconnect (if any) - * interconnect_init() : initialize interconnect on node - * fork() N tasks --> wait() --> interconnect_fini() - * \ - * `--> interconnect_attach() : attach each proc to interconnect - * interconnect_env() : setup child environment - * exec() - */ -static int -_run_job(slurmd_job_t *job) -{ - int rc = SLURM_SUCCESS; - struct passwd *spwd = getpwuid(geteuid()); - - _block_most_signals(); - - /* Insert job info into shared memory */ - job_update_shm(job); - - if (!job->batch && interconnect_init(job) == SLURM_ERROR) { - error("interconnect_init: %m"); - rc = errno; - goto fail; - } - - if ((rc = io_spawn_handler(job)) < 0) { - rc = ESLURMD_IO_ERROR; - goto fail1; - } - - /* connect job stderr to this node's task 0 stderr so - * user recieves error messages on stderr - */ - _slurmd_job_log_init(job); - - /* - * Temporarily drop permissions - */ - if ((rc = _drop_privileges(job->pwd)) < 0) { - rc = ESLURMD_SET_UID_OR_GID_ERROR; - goto fail2; - } - - /* Open input/output files and/or connections back to client - */ - rc = io_prepare_clients(job); - - if (_reclaim_privileges(spwd) < 0) - error("sete{u/g}id(%ld/%ld): %m", spwd->pw_uid, spwd->pw_gid); +static void +_send_launch_resp(slurmd_job_t *job, int rc) +{ + int i; + slurm_msg_t resp_msg; + launch_tasks_response_msg_t resp; + srun_info_t *srun = list_peek(job->sruns); + debug("Sending launch resp rc=%d", rc); - if (rc < 0) { - rc = ESLURMD_IO_ERROR; - goto fail2; - } + resp_msg.address = srun->resp_addr; + resp_msg.data = &resp; + resp_msg.msg_type = RESPONSE_LAUNCH_TASKS; - rc = _exec_all_tasks(job); - if (!job->batch) - _send_launch_resp(job, rc); - _wait_for_all_tasks(job); + resp.node_name = conf->hostname; + resp.srun_node_id = job->nodeid; + resp.return_code = rc; + resp.count_of_pids = job->ntasks; - debug2("all tasks exited, waiting on IO"); - io_close_all(job); - pthread_join(job->ioid, NULL); - debug2("IO complete"); + resp.local_pids = xmalloc(job->ntasks * sizeof(*resp.local_pids)); + for (i = 0; i < job->ntasks; i++) + resp.local_pids[i] = job->task[i]->pid; - if (!job->batch) - interconnect_fini(job); /* ignore errors */ - job_delete_shm(job); /* again, ignore errors */ - verbose("job completed, rc = %d", rc); - return rc; + slurm_send_only_node_msg(&resp_msg); -fail2: - io_close_all(job); - pthread_join(job->ioid, NULL); -fail1: - if (!job->batch) - interconnect_fini(job); -fail: - job_delete_shm(job); - if (!job->batch) - _send_launch_resp(job, rc); - return rc; + xfree(resp.local_pids); } + static int _complete_job(slurmd_job_t *job, int err, int status) { @@ -434,6 +807,8 @@ _complete_job(slurmd_job_t *job, int err, int status) return SLURM_SUCCESS; } + + static void _handle_attach_req(slurmd_job_t *job) { @@ -456,37 +831,6 @@ _handle_attach_req(slurmd_job_t *job) io_new_clients(job); } -static void -_hup_handler(int sig) {;} - -static void -_wait_for_all_tasks(slurmd_job_t *job) -{ - int waiting = job->ntasks; - int i; - - xsignal(SIGHUP, _hup_handler); - - while (waiting > 0) { - int status; - pid_t pid = waitpid(0, &status, 0); - if ((pid < (pid_t) 0)) { - if (errno == EINTR) { - _handle_attach_req(job); - continue; - } - error("waitpid: %m"); - /* job_cleanup() */ - } - for (i = 0; i < job->ntasks; i++) { - if (job->task[i]->pid == pid) { - _send_exit_msg(status, job->task[i]); - waiting--; - } - } - } - return; -} static int _drop_privileges(struct passwd *pwd) @@ -542,204 +886,6 @@ _reclaim_privileges(struct passwd *pwd) } - - - -static int -_become_user(slurmd_job_t *job) -{ - if (setgid(job->pwd->pw_gid) < 0) { - error("setgid: %m"); - return -1; - } - - if (initgroups(job->pwd->pw_name, job->pwd->pw_gid) < 0) { - ; - /* error("initgroups: %m"); */ - } - - if (setuid(job->pwd->pw_uid) < 0) { - error("setuid: %m"); - return -1; - } - - return 0; -} - -static void -_task_exec(slurmd_job_t *job, int i) -{ - int rc; - log_options_t opts = LOG_OPTS_STDERR_ONLY; - - io_prepare_child(job->task[i]); - - /* - * Reinitialize slurm log facility to send errors back to client - */ - log_init("slurmd", opts, 0, NULL); - - if ((rc = _become_user(job)) < 0) - exit(rc); - - if (_unblock_all_signals() == SLURM_ERROR) { - error("unable to unblock signals"); - exit(1); - } - - /* attach to interconnect */ - if (!job->batch && (interconnect_attach(job, i) < 0)) { - error("interconnect attach failed: %m"); - exit(1); - } - - if (!job->batch && (interconnect_env(job, i) < 0)) { - error("interconnect_env: %m"); - } - - if (chdir(job->cwd) < 0) { - error("couldn't chdir to `%s': %m: going to /tmp instead", - job->cwd); - if (chdir("/tmp") < 0) { - error("couldn't chdir to /tmp either. dying."); - exit(1); - } - } - -#ifdef HAVE_TOTALVIEW - /* Stop the tasks on exec for TotalView to connect */ - if ((job->task_flags & TASK_TOTALVIEW_DEBUG) && - (ptrace(PTRACE_TRACEME, 0, NULL, NULL) == -1)) - error("ptrace: %m"); -#endif - - /* exec the cmdline */ - execve(job->argv[0], job->argv, job->env); - - /* error and clean up if execve() returns: - */ - error("execve(): %s: %m", job->argv[0]); - exit(errno); -} - -static int -_exec_all_tasks(slurmd_job_t *job) -{ - pid_t sid; - int i; - - debug3("%ld entered _launch_tasks", getpid()); - - xsignal(SIGPIPE, SIG_IGN); - - if ((sid = setsid()) < (pid_t) 0) { - error("setsid: %m"); - } - - _block_most_signals(); - - if (shm_update_step_sid(job->jobid, job->stepid, sid) < 0) - error("shm_update_step_sid: %m"); - - debug2("invoking %d tasks", job->ntasks); - - for (i = 0; i < job->ntasks; i++) { - task_t t; - debug2("going to fork task %d", i); - t.id = i; - t.global_id = job->task[i]->gid; - t.ppid = getpid(); - - if ((t.pid = fork()) < 0) { - error("fork: %m"); - return 1; - /* job_cleanup() */ - } else if (t.pid == 0) /* child */ - break; - - /* Parent continues loop: */ - - job->task[i]->pid = t.pid; - - debug2("%ld: forked child process %ld for task %d", - getpid(), (long) t.pid, i); - debug2("going to add task %d to shm", i); - if (shm_add_task(job->jobid, job->stepid, &t) < 0) - error("shm_add_task: %m"); - debug2("task %d added to shm", i); -#ifdef HAVE_TOTALVIEW - /* If task to be debugged, wait for it to stop via - * child's ptrace(PTRACE_TRACEME), then SIGSTOP, and - * ptrace(PTRACE_DETACH). This requires a kernel patch, - * which you probably already have in place for TotalView: - * http://hypermail.idiosynkrasia.net/linux-kernel/ - * archived/2001/week51/1193.html */ - if (job->task_flags & TASK_TOTALVIEW_DEBUG) { - int status; - waitpid(t.pid, &status, WUNTRACED); - if (kill(t.pid, SIGSTOP)) - error("kill %ld: %m", (long) t.pid); - if (ptrace(PTRACE_DETACH, (long) t.pid, NULL, NULL)) - error("ptrace %ld: %m", (long) t.pid); - } -#endif - - } - - if (i == job->ntasks) - return 0; /* _wait_for_all_tasks(job); */ - else - _task_exec(job, i); - - debug3("All tasks exited"); - return 0; -} - -static int -_send_exit_msg(int rc, task_info_t *t) -{ - slurm_msg_t resp; - task_exit_msg_t msg; - uint32_t task_id_list[1]; - ListIterator i; - srun_info_t *srun; - - debug3("sending task exit msg for %d", t->gid); - - /* FIXME:XXX: attempt to combine task IDs in single message */ - task_id_list[0] = t->gid; - msg.task_id_list = task_id_list; - msg.num_tasks = 1; - msg.return_code = rc; - resp.data = &msg; - resp.msg_type = MESSAGE_TASK_EXIT; - - i = list_iterator_create(t->srun_list); - while ((srun = list_next(i))) { - resp.address = srun->resp_addr; - if (resp.address.sin_family != 0) - slurm_send_only_node_msg(&resp); - } - list_iterator_destroy(i); - - return SLURM_SUCCESS; -} - -static int -_unblock_all_signals(void) -{ - sigset_t set; - if (sigfillset(&set)) { - error("sigfillset: %m"); - return SLURM_ERROR; - } - if (sigprocmask(SIG_UNBLOCK, &set, NULL)) { - error("sigprocmask: %m"); - return SLURM_ERROR; - } - return SLURM_SUCCESS; -} - static int _block_most_signals(void) { @@ -762,33 +908,6 @@ _block_most_signals(void) return SLURM_SUCCESS; } -static void -_send_launch_resp(slurmd_job_t *job, int rc) -{ - int i; - slurm_msg_t resp_msg; - launch_tasks_response_msg_t resp; - srun_info_t *srun = list_peek(job->sruns); - - debug("Sending launch resp rc=%d", rc); - - resp_msg.address = srun->resp_addr; - resp_msg.data = &resp; - resp_msg.msg_type = RESPONSE_LAUNCH_TASKS; - - resp.node_name = conf->hostname; - resp.srun_node_id = job->nodeid; - resp.return_code = rc; - resp.count_of_pids = job->ntasks; - - resp.local_pids = xmalloc(job->ntasks * sizeof(*resp.local_pids)); - for (i = 0; i < job->ntasks; i++) - resp.local_pids[i] = job->task[i]->pid; - - slurm_send_only_node_msg(&resp_msg); - - xfree(resp.local_pids); -} static void _slurmd_job_log_init(slurmd_job_t *job) @@ -819,54 +938,36 @@ _slurmd_job_log_init(slurmd_job_t *job) log_init(argv0, conf->log_opts, 0, NULL); } -int -run_script(bool prolog, const char *path, uint32_t jobid, uid_t uid) -{ - int status; - pid_t cpid; - char *name = prolog ? "prolog" : "epilog"; - - if (path == NULL || path[0] == '\0') - return 0; - - debug("[job %d] attempting to run %s [%s]", jobid, name, path); - - if (access(path, R_OK | X_OK) < 0) { - debug("Not running %s [%s]: %m", name, path); - return 0; - } - if ((cpid = fork()) < 0) { - error ("executing %s: fork: %m", name); - return -1; - } - if (cpid == 0) { - char *argv[4]; - char **env; - int envc = 0; +/* + * Attempt to change the cmdline argument list for slurmd + * to denote the job/job step that this process is managing. + */ +static void +_setargs(slurmd_job_t *job, char **argv, int argc) +{ + int i; + size_t len = 0; + char *arg = NULL; + for (i = 0; i < argc; i++) + len += strlen(argv[i]) + 1; - env = xmalloc(sizeof(char *)); + if (job->stepid == NO_VAL) + xstrfmtcat(arg, "[%d]", job->jobid); + else + xstrfmtcat(arg, "[%d.%d]", job->jobid, job->stepid); - argv[0] = xstrdup(path); - argv[1] = NULL; + if (len < (strlen(arg) + 7)) + goto done; - env[0] = NULL; - setenvpf(&env, &envc, "SLURM_JOBID=%u", jobid); - setenvpf(&env, &envc, "SLURM_UID=%u", uid); + memset(argv[0], 0, len); + strncpy(argv[0], "slurmd", 6); + strncpy((*argv)+7, arg, strlen(arg)); - execve(path, argv, env); - error("help! %m"); - exit(127); - } + done: + xfree(arg); + return; +} - do { - if (waitpid(cpid, &status, 0) < 0) { - if (errno != EINTR) - return -1; - } else - return status; - } while(1); - /* NOTREACHED */ -} diff --git a/src/slurmd/no_interconnect.c b/src/slurmd/no_interconnect.c index 0102a698bb6..45815b3fbd0 100644 --- a/src/slurmd/no_interconnect.c +++ b/src/slurmd/no_interconnect.c @@ -28,7 +28,10 @@ #include <src/slurmd/interconnect.h> #include <src/slurmd/setenvpf.h> -#include "src/slurmd/shm.h" +int interconnect_preinit (slurmd_job_t *job) +{ + return SLURM_SUCCESS; +} int interconnect_init (slurmd_job_t *job) { @@ -43,7 +46,7 @@ int interconnect_attach (slurmd_job_t *job, int taskid) /* * Set env variables needed for this interconnect */ -int interconnect_env(slurmd_job_t *job, int taskid) +int interconnect_env (slurmd_job_t *job, int taskid) { int cnt = job->envc; task_info_t *t = job->task[taskid]; @@ -60,7 +63,12 @@ int interconnect_env(slurmd_job_t *job, int taskid) return SLURM_SUCCESS; } -int interconnect_fini(slurmd_job_t *job) +int interconnect_fini (slurmd_job_t *job) +{ + return SLURM_SUCCESS; +} + +int interconnect_postfini (slurmd_job_t *job) { return SLURM_SUCCESS; } diff --git a/src/slurmd/req.c b/src/slurmd/req.c index e1f187b6baf..37b979de0dc 100644 --- a/src/slurmd/req.c +++ b/src/slurmd/req.c @@ -185,7 +185,7 @@ _launch_tasks(launch_tasks_request_msg_t *req, slurm_addr *cli) break; default: debug("created process %ld for job %d.%d", - pid, req->job_id, req->job_step_id); + pid, req->job_id, req->job_step_id); break; } @@ -336,7 +336,7 @@ _rpc_kill_tasks(slurm_msg_t *msg, slurm_addr *cli_addr) if (!(step = shm_get_step(req->job_id, req->job_step_id))) { debug("kill for nonexistent job %d.%d requested", - req->job_id, req->job_step_id); + req->job_id, req->job_step_id); rc = ESLURM_INVALID_JOB_ID; goto done; } @@ -436,7 +436,7 @@ _rpc_reattach_tasks(slurm_msg_t *msg, slurm_addr *cli) memcpy(&resp_msg.address, cli, sizeof(slurm_addr)); slurm_set_addr(&resp_msg.address, req->resp_port, NULL); - if ((step = shm_get_step(req->job_id, req->job_step_id)) < 0) { + if (!(step = shm_get_step(req->job_id, req->job_step_id))) { rc = ESRCH; goto done; } diff --git a/src/slurmd/shm.c b/src/slurmd/shm.c index cd92ae3ce54..11c3c2a375b 100644 --- a/src/slurmd/shm.c +++ b/src/slurmd/shm.c @@ -79,7 +79,7 @@ #define SHM_LOCKNAME "/.slurm.lock" /* Increment SHM_VERSION if format changes */ -#define SHM_VERSION 1004 +#define SHM_VERSION 1005 /* These macros convert shared memory pointers to local memory * pointers and back again. Pointers in shared memory are relative @@ -422,13 +422,18 @@ shm_signal_step(uint32_t jobid, uint32_t stepid, uint32_t signal) if ((i = _shm_find_step(jobid, stepid)) >= 0) { s = &slurmd_shm->step[i]; for (t = _taskp(s->task_list); t; t = _taskp(t->next)) { - if (getsid(t->pid) != s->sid) + + if (getsid(t->pid) != s->sid) { + error ("Task pid is not in my session!"); continue; + } + if (t->pid > 0 && kill(t->pid, signo) < 0) { error("kill %d.%d task %d pid %ld: %m", jobid, stepid, t->id, (long)t->pid); retval = errno; } + } } else retval = ESRCH; @@ -486,7 +491,6 @@ shm_get_step_owner(uint32_t jobid, uint32_t stepid) } - /* * Free a job step structure in local memory */ @@ -503,6 +507,21 @@ shm_free_step(job_step_t *step) } while ((t = p)); } +int +shm_update_step_mpid(uint32_t jobid, uint32_t stepid, int mpid) +{ + int i, retval = SLURM_SUCCESS; + _shm_lock(); + if ((i = _shm_find_step(jobid, stepid)) >= 0) + slurmd_shm->step[i].mpid = mpid; + else { + slurm_seterrno(ESRCH); + retval = SLURM_FAILURE; + } + _shm_unlock(); + return retval; +} + int shm_update_step_sid(uint32_t jobid, uint32_t stepid, int sid) { @@ -593,8 +612,8 @@ shm_update_step_addrs(uint32_t jobid, uint32_t stepid, s->io_update = true; debug3("Going to send shm update signal to %ld", - s->sid); - if (kill(s->sid, SIGHUP) < 0) { + s->mpid); + if ((s->mpid > 0) && (kill(s->mpid, SIGHUP) < 0)) { slurm_seterrno(EPERM); retval = SLURM_FAILURE; } @@ -617,8 +636,14 @@ shm_step_addrs(uint32_t jobid, uint32_t stepid, slurm_addr *ioaddr, slurm_addr *respaddr, srun_key_t *key) { int i, retval = SLURM_SUCCESS; - xassert(ioaddr != NULL); + + xassert(jobid >= 0); + xassert(stepid >= 0); + + xassert(ioaddr != NULL); xassert(respaddr != NULL); + xassert(key != NULL); + _shm_lock(); if ((i = _shm_find_step(jobid, stepid)) >= 0) { job_step_t *s = &slurmd_shm->step[i]; diff --git a/src/slurmd/shm.h b/src/slurmd/shm.h index ffe388a1156..1220543e814 100644 --- a/src/slurmd/shm.h +++ b/src/slurmd/shm.h @@ -73,7 +73,7 @@ struct task { int global_id; /* global task id */ pid_t pid; /* pid of user process */ pid_t ppid; /* parent pid of user process */ - pid_t mpid; /* manager pid of this task */ + /* reverse pointer back to controlling job step */ job_step_t *job_step; task_t *next; /* next task in this job step */ @@ -83,20 +83,23 @@ struct job_step { uid_t uid; uint32_t jobid; uint32_t stepid; - uint32_t sw_id; /* Switch/Interconnect specific id */ - int ntasks; /* number of tasks in this job */ - pid_t sid; /* Job session id */ - char exec_name[MAXPATHLEN]; /* Executable's pathname */ + uint32_t sw_id; /* Switch/Interconnect specific id */ + int ntasks; /* number of tasks in this job */ + pid_t mpid; /* Job manager pid */ + pid_t sid; /* Job session id (smgr pid) */ + + /* Executable's pathname */ + char exec_name[MAXPATHLEN]; - int io_update; /* srun address has been updated */ - slurm_addr respaddr; /* Addr to send messages to srun on */ - slurm_addr ioaddr; /* Addr to connect to initialize IO */ - srun_key_t key; /* last key from srun client */ + int io_update; /* srun address has been updated */ + slurm_addr respaddr; /* Addr to send messages to srun on */ + slurm_addr ioaddr; /* Addr to connect to initialize IO */ + srun_key_t key; /* last key from srun client */ - job_state_t state; /* Job step status */ - time_t timelimit; /* job time limit */ - task_t *task_list; /* list of this step's tasks */ + job_state_t state; /* Job step status */ + time_t timelimit; /* job time limit */ + task_t *task_list; /* list of this step's tasks */ }; @@ -203,6 +206,11 @@ int shm_add_task(uint32_t jobid, uint32_t stepid, task_t *task); */ int shm_update_step_sid(uint32_t jobid, uint32_t stepid, int sid); +/* + * update job step "manager" pid + */ +int shm_update_step_mpid(uint32_t jobid, uint32_t stepid, int mpid); + /* * update job step state diff --git a/src/slurmd/slurmd.c b/src/slurmd/slurmd.c index b08459ba7dd..9563e9c1a8b 100644 --- a/src/slurmd/slurmd.c +++ b/src/slurmd/slurmd.c @@ -426,11 +426,13 @@ _read_config() path_pubkey = xstrdup(conf->cf.job_credential_public_certificate); + if (!conf->logfile) + conf->logfile = xstrdup(conf->cf.slurmd_logfile); + _free_and_set(&conf->epilog, xstrdup(conf->cf.epilog)); _free_and_set(&conf->prolog, xstrdup(conf->cf.prolog)); _free_and_set(&conf->tmpfs, xstrdup(conf->cf.tmp_fs)); _free_and_set(&conf->spooldir, xstrdup(conf->cf.slurmd_spooldir)); - _free_and_set(&conf->logfile, xstrdup(conf->cf.slurmd_logfile)); _free_and_set(&conf->pidfile, xstrdup(conf->cf.slurmd_pidfile)); _free_and_set(&conf->pubkey, path_pubkey); diff --git a/src/slurmd/smgr.c b/src/slurmd/smgr.c new file mode 100644 index 00000000000..16d934c9b5f --- /dev/null +++ b/src/slurmd/smgr.c @@ -0,0 +1,410 @@ +/*****************************************************************************\ + * slurmd/smgr.c - session manager functions for slurmd + * $Id$ + ***************************************************************************** + * Copyright (C) 2002 The Regents of the University of California. + * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). + * Written by Mark A. Grondona <mgrondona@llnl.gov>. + * UCRL-CODE-2002-040. + * + * This file is part of SLURM, a resource management program. + * For details, see <http://www.llnl.gov/linux/slurm/>. + * + * SLURM is free software; you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the License, or (at your option) + * any later version. + * + * SLURM is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along + * with SLURM; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. +\*****************************************************************************/ + +#if HAVE_CONFIG_H +# include "config.h" +#endif + +#include <sys/wait.h> +#include <sys/stat.h> +#include <sys/param.h> +#include <sys/ptrace.h> +#include <unistd.h> +#include <pwd.h> +#include <grp.h> +#include <string.h> + +#if HAVE_STDLIB_H +# include <stdlib.h> +#endif + +#if HAVE_SYS_TYPES_H +# include <sys/types.h> +#endif + +#include <slurm/slurm_errno.h> + +#include "src/common/log.h" +#include "src/common/xsignal.h" + +#include "src/slurmd/smgr.h" +#include "src/slurmd/ulimits.h" +#include "src/slurmd/interconnect.h" +#include "src/slurmd/io.h" + +/* + * Static prototype definitions. + */ +static void _session_mgr(slurmd_job_t *job); +static int _exec_all_tasks(slurmd_job_t *job); +static void _exec_task(slurmd_job_t *job, int i); +static int _become_user(slurmd_job_t *job); +static void _wait_for_all_tasks(slurmd_job_t *job); +static int _send_exit_status(slurmd_job_t *job, int fd, int id, int status); +static int _writen(int fd, void *buf, size_t nbytes); +static int _unblock_all_signals(void); +static void _cleanup_file_descriptors(slurmd_job_t *job); + +/* parallel debugger support */ +static void _pdebug_trace_process(slurmd_job_t *job, pid_t pid); +static void _pdebug_stop_current(slurmd_job_t *job); + +/* + * Create the slurmd session manager process + */ +pid_t +smgr_create(slurmd_job_t *job) +{ + pid_t pid; + switch ((pid = fork())) { + case -1: + error("smgr_create: fork: %m"); + return pid; + break; + case 0: /* child */ + close(job->fdpair[0]); + _session_mgr(job); + /* NOTREACHED */ + break; + } + + /* parent continues here */ + + close(job->fdpair[1]); + + return pid; +} + +static void +_session_mgr(slurmd_job_t *job) +{ + xassert(job != NULL); + + /* _cleanup_file_descriptors(job); */ + + /* + * Call interconnect_init() before becoming user + */ + if (!job->batch && (interconnect_init(job) < 0)) { + error("interconnect_init: %m"); + exit(1); + } + + if (_become_user(job) < 0) + exit(2); + + if (setsid() < (pid_t) 0) { + error("setsid: %m"); + exit(3); + } + + if (chdir(job->cwd) < 0) { + error("couldn't chdir to `%s': %m: going to /tmp instead", + job->cwd); + if (chdir("/tmp") < 0) { + error("couldn't chdir to /tmp either. dying."); + exit(4); + } + } + + if (set_user_limits(job) < 0) { + debug("Unable to set user limits"); + exit(5); + } + + if (_exec_all_tasks(job) < 0) { + debug("exec_all_tasks failed"); + exit(6); + } + + _cleanup_file_descriptors(job); + + _wait_for_all_tasks(job); + + if (!job->batch && (interconnect_fini(job) < 0)) { + error("interconnect_fini: %m"); + exit(1); + } + + exit(SLURM_SUCCESS); +} + +/* Close write end of stdin (at the very least) + */ +static void +_cleanup_file_descriptors(slurmd_job_t *j) +{ + int i; + for (i = 0; i < j->ntasks; i++) { + close(j->task[i]->pin[1]); /* Ignore errors */ + close(j->task[i]->pout[0]); + + /* Leave stderr open for slurmd error logging + */ + } +} + +static int +_become_user(slurmd_job_t *job) +{ + if (setgid(job->pwd->pw_gid) < 0) { + error("setgid: %m"); + return -1; + } + + if (initgroups(job->pwd->pw_name, job->pwd->pw_gid) < 0) { + ; + /* error("initgroups: %m"); */ + } + + if (setuid(job->pwd->pw_uid) < 0) { + error("setuid: %m"); + return -1; + } + + return 0; +} + + +/* Execute N tasks and send pids back to job manager process. + */ +static int +_exec_all_tasks(slurmd_job_t *job) +{ + int i; + int fd = job->fdpair[1]; + + xassert(job != NULL); + xassert(fd >= 0); + + for (i = 0; i < job->ntasks; i++) { + pid_t pid = fork(); + + if (pid < 0) { + error("fork: %m"); + return SLURM_ERROR; + } else if (pid == 0) /* child */ + _exec_task(job, i); + + /* Parent continue: + */ + + debug2("pid %ld forked child process %ld for local task %d", + getpid(), (long) pid, i); + + /* + * Send pid to job manager + */ + if (_writen(fd, (char *)&pid, sizeof(pid_t)) < 0) { + error("unable to update task pid!: %m"); + return SLURM_ERROR; + } + + job->task[i]->pid = pid; + + /* + * Prepare process for attach by parallel debugger + * (if specified and able) + */ + _pdebug_trace_process(job, pid); + } + + return SLURM_SUCCESS; +} + +static void +_exec_task(slurmd_job_t *job, int i) +{ + log_options_t opts = LOG_OPTS_STDERR_ONLY; + + io_prepare_child(job->task[i]); + + /* + * Reinitialize slurm log facility to send errors back to client + */ + log_init("slurmd", opts, 0, NULL); + + if (_unblock_all_signals() < 0) { + error("unable to unblock signals"); + exit(1); + } + + if (!job->batch) { + if (interconnect_attach(job, i) < 0) { + error("Unable to attach to interconnect: %m"); + exit(1); + } + + if (interconnect_env(job, i) < 0) + error("error establishing env for interconnect: %m"); + + _pdebug_stop_current(job); + } + + execve(job->argv[0], job->argv, job->env); + + /* + * error() and clean up if execve() returns: + */ + error("execve(): %s: %m", job->argv[0]); + exit(errno); +} + + + +/* wait for N tasks to exit, reporting exit status back to slurmd mgr + * process over file descriptor fd. + * + */ +static void +_wait_for_all_tasks(slurmd_job_t *job) +{ + int waiting = job->ntasks; + int i = 0; + int id = 0; + int fd = job->fdpair[1]; + + while (waiting > 0) { + int status = 0; + pid_t pid; + + if ((pid = waitpid(0, &status, 0)) < (pid_t) 0) { + if (errno != EINTR) + error("waitpid: %m"); + continue; + } + + for (i = 0; i < job->ntasks; i++) { + if (job->task[i]->pid == pid) { + waiting--; + id = i; + break; + } + } + + _send_exit_status(job, fd, id, status); + status = 0; + } + return; +} + +static int +_send_exit_status(slurmd_job_t *job, int fd, int tid, int status) +{ + exit_status_t e; + int len; + + e.taskid = tid; + e.status = status; + + len = _writen(fd, &e, sizeof(e)); + + debug("task %d exited with status %d", tid, status); + + return len; +} + +/* + * Prepare task for parallel debugger attach + */ +static void +_pdebug_trace_process(slurmd_job_t *job, pid_t pid) +{ +#if HAVE_TOTALVIEW + /* If task to be debugged, wait for it to stop via + * child's ptrace(PTRACE_TRACEME), then SIGSTOP, and + * ptrace(PTRACE_DETACH). This requires a kernel patch, + * which you probably already have in place for TotalView: + * http://hypermail.idiosynkrasia.net + * /linux-kernel/archived/2001/week51/1193.html + */ + + if (job->task_flags & TASK_TOTALVIEW_DEBUG) { + int status; + waitpid(pid, &status, WUNTRACED); + if (kill(pid, SIGSTOP) < 0) + error("kill(%ld): %m", (long) pid); + if (ptrace(PTRACE_DETACH, (long) pid, NULL, NULL)) + error("ptrace(%ld): %m", (long) pid); + } +#endif /* HAVE_TOTALVIEW */ +} + +/* + * Stop current task on exec() for connection from a parallel debugger + */ +static void +_pdebug_stop_current(slurmd_job_t *job) +{ +#if HAVE_TOTALVIEW + /* + * Stop the task on exec for TotalView to connect + */ + if ( (job->task_flags & TASK_TOTALVIEW_DEBUG) + && (ptrace(PTRACE_TRACEME, 0, NULL, NULL) < 0) ) + error("ptrace: %m"); +#endif +} + + +static int +_writen(int fd, void *buf, size_t nbytes) +{ + int n = 0; + char *pbuf = (char *) buf; + size_t nleft = nbytes; + + while (nleft > 0) { + if ((n = write(fd, (void *) pbuf, nleft)) >= 0) { + pbuf+=n; + nleft-=n; + } else if (errno == EINTR) + continue; + else { + debug("write: %m"); + break; + } + } + return(n); +} + +static int +_unblock_all_signals(void) +{ + sigset_t set; + if (sigfillset(&set)) { + error("sigfillset: %m"); + return SLURM_ERROR; + } + if (sigprocmask(SIG_UNBLOCK, &set, NULL)) { + error("sigprocmask: %m"); + return SLURM_ERROR; + } + return SLURM_SUCCESS; +} + + diff --git a/src/slurmd/smgr.h b/src/slurmd/smgr.h new file mode 100644 index 00000000000..f1c1a7a3549 --- /dev/null +++ b/src/slurmd/smgr.h @@ -0,0 +1,64 @@ +/*****************************************************************************\ + * src/slurmd/smgr.h - session manager functions for slurmd + * $Id$ + ***************************************************************************** + * Copyright (C) 2002 The Regents of the University of California. + * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). + * Written by Mark Grondona <mgrondona@llnl.gov>. + * UCRL-CODE-2002-040. + * + * This file is part of SLURM, a resource management program. + * For details, see <http://www.llnl.gov/linux/slurm/>. + * + * SLURM is free software; you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the License, or (at your option) + * any later version. + * + * SLURM is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along + * with SLURM; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. +\*****************************************************************************/ + +#ifndef _SMGR_H +#define _SMGR_H + +#if HAVE_CONFIG_H +# include "config.h" +#endif + +#include <slurm/slurm_errno.h> + +#if HAVE_SYS_TYPES_H +# include <sys/types.h> +#endif /* HAVE_SYS_TYPES_H */ + +#include "src/slurmd/job.h" + +/* + * Task exit code information + */ +typedef struct exit_status { + int taskid; + int status; +} exit_status_t; + + +/* + * Create the session manager process, which starts a new session + * and runs as the UID of the job owner. The session manager process + * will wait for all tasks in the job to exit (sending task exit messages + * as appropriate), and then exit itself. + * + * If the smgr process is successfully created, the pid of the new + * process is returned. On error, (pid_t) -1 is returned. + * + */ +pid_t smgr_create(slurmd_job_t *job); + +#endif /* !_SMGR_H */ diff --git a/src/slurmd/ulimits.c b/src/slurmd/ulimits.c new file mode 100644 index 00000000000..28566dfd8d5 --- /dev/null +++ b/src/slurmd/ulimits.c @@ -0,0 +1,139 @@ +/*****************************************************************************\ + * src/slurmd/ulimits.c - set user limits for job + * $Id$ + ***************************************************************************** + * Copyright (C) 2002 The Regents of the University of California. + * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). + * Written by Mark Grondona <mgrondona@llnl.gov>. + * UCRL-CODE-2002-040. + * + * This file is part of SLURM, a resource management program. + * For details, see <http://www.llnl.gov/linux/slurm/>. + * + * SLURM is free software; you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the License, or (at your option) + * any later version. + * + * SLURM is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along + * with SLURM; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. +\*****************************************************************************/ + +#if HAVE_CONFIG_H +# include "config.h" +#endif + +#include <sys/resource.h> +#include <unistd.h> +#include <stdlib.h> +#include <string.h> + +#include "src/common/log.h" + +#include "src/slurmd/job.h" + +struct userlim { + char *var; + int resource; +}; + +static struct userlim ulims[] = + { { "SLURM_RLIMIT_CORE" , RLIMIT_CORE }, + { "SLURM_RLIMIT_FSIZE" , RLIMIT_FSIZE }, + { "SLURM_RLIMIT_NPROC" , RLIMIT_NPROC }, + { "SLURM_RLIMIT_NOFILE", RLIMIT_NOFILE}, + { NULL, 0 } }; + +/* + * Prototypes: + * + */ +static char * _getenvp(char **env, const char *name); +static long _get_env_val(char **env, const char *name); +static int _set_limit(char **env, struct userlim *ulim); + + +/* + * Set all user limits off environment variables as detailed in + * the local ulims[] var. Sets limits off environment variables + * in job->env. + */ +int set_user_limits(slurmd_job_t *job) +{ + struct userlim *uptr = &ulims[0]; + + while (uptr && (uptr->var != NULL)) { + _set_limit(job->env, uptr); + uptr++; + } + + return SLURM_SUCCESS; +} + +static int +_set_limit(char **env, struct userlim *u) +{ + long val; + int retval = -1; + struct rlimit r; + + if ((val = _get_env_val(env, u->var)) > -2L) { + getrlimit(u->resource, &r); + + r.rlim_cur = (val == -1L) ? RLIM_INFINITY : (rlim_t) val; + + if ((retval = setrlimit(u->resource, &r)) < 0) + error("setrlimit(%s, %ld): %m", u->var+5, val); + } + + return retval; +} + + +static long +_get_env_val(char **env, const char *name) +{ + char *val = NULL; + char *p = NULL; + long retval = 0L; + + xassert(env != NULL); + xassert(name != NULL); + + if(!(val = _getenvp(env, name))) + return -2L; + + retval = strtol(val, &p, 10); + + if (p && (*p != '\0')) { + error("Invalid %s env var, value = `%s'", name, val); + return -2L; + } + + return retval; +} + +static char * +_getenvp(char **env, const char *name) +{ + size_t len = strlen(name); + char **ep; + + if ((env == NULL) || (env[0] == '\0')) + return NULL; + + for (ep = env; *ep != NULL; ++ep) { + if (!strncmp(*ep, name, len) && ((*ep)[len] == '=')) + return &(*ep)[len+1]; + } + + return NULL; +} + + diff --git a/src/slurmd/ulimits.h b/src/slurmd/ulimits.h new file mode 100644 index 00000000000..a89f5f8b794 --- /dev/null +++ b/src/slurmd/ulimits.h @@ -0,0 +1,38 @@ +/*****************************************************************************\ + * src/slurmd/ulimits.h - functions to set user resource limits in slurmd + ***************************************************************************** + * Copyright (C) 2002 The Regents of the University of California. + * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). + * Written by Mark Grondona <mgrondona@llnl.gov>. + * UCRL-CODE-2002-040. + * + * This file is part of SLURM, a resource management program. + * For details, see <http://www.llnl.gov/linux/slurm/>. + * + * SLURM is free software; you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the License, or (at your option) + * any later version. + * + * SLURM is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along + * with SLURM; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. +\*****************************************************************************/ + +#ifndef _SLURMD_ULIMITS_H +#define _SLURMD_ULIMITS_H + +#include "src/slurmd/job.h" + +/* + * Set user resource limits as defined by SLURM_RLIMIT* environment + * variables contained in job->env + */ +int set_user_limits(slurmd_job_t *job); + +#endif /* !_SLURMD_ULIMITS_H */ -- GitLab