diff --git a/src/slurmd/Makefile.am b/src/slurmd/Makefile.am index 7fc5803090293bbcc5df6a231c7ce8f67141ae92..84514f662dc52cd7a79f58de30fbb6296f14683e 100644 --- a/src/slurmd/Makefile.am +++ b/src/slurmd/Makefile.am @@ -27,6 +27,7 @@ common_sources = \ slurmd.c slurmd.h \ req.c req.h \ mgr.c mgr.h \ + smgr.c smgr.h \ get_mach_stat.c \ get_mach_stat.h \ read_proc.c \ @@ -35,6 +36,7 @@ common_sources = \ semaphore.c semaphore.h \ shm.c shm.h \ fname.c fname.h \ + ulimits.c ulimits.h \ setenvpf.c setenvpf.h \ interconnect.h diff --git a/src/slurmd/elan_interconnect.c b/src/slurmd/elan_interconnect.c index 25f1211735f6ad866ba07e1c305458d0aa5fc1b6..5c0bdeaa5f2708ecea5fdf364956fa3b878b4a22 100644 --- a/src/slurmd/elan_interconnect.c +++ b/src/slurmd/elan_interconnect.c @@ -46,19 +46,12 @@ #include "src/slurmd/shm.h" static int -_wait_and_destroy_prg(qsw_jobinfo_t qsw_job, pid_t pid) +_wait_and_destroy_prg(qsw_jobinfo_t qsw_job) { int i = 0; int sleeptime = 1; - debug3("waiting to destory program description..."); - again: - if (waitpid(pid, NULL, 0) < 0) { - if (errno == EINTR) - goto again; - error("waitpid: %m"); - exit(1); - } + debug3("going to destory program description..."); while(qsw_prgdestroy(qsw_job) < 0) { i++; @@ -78,8 +71,12 @@ _wait_and_destroy_prg(qsw_jobinfo_t qsw_job, pid_t pid) } debug("destroyed program description"); + return SLURM_SUCCESS; +} - exit(0); +int +interconnect_preinit(slurmd_job_t *job) +{ return SLURM_SUCCESS; } @@ -89,27 +86,11 @@ _wait_and_destroy_prg(qsw_jobinfo_t qsw_job, pid_t pid) int interconnect_init(slurmd_job_t *job) { - pid_t pid; - - /* Process 1: */ - switch ((pid = fork())) - { - case -1: - error ("elan_interconnect_prepare fork(): %m"); - return SLURM_ERROR ; - case 0: /* child falls thru */ - break; - default: /* parent */ - _wait_and_destroy_prg(job->qsw_job, pid); - /*NOTREACHED*/ - } - - /* Process 2: */ - debug("calling qsw_prog_init from process %ld", getpid()); + debug2("calling interconnect_init from process %ld", getpid()); if (qsw_prog_init(job->qsw_job, job->uid) < 0) { error ("elan interconnect_init: qsw_prog_init: %m"); /* we may lose the following info if not logging to stderr */ - qsw_print_jobinfo(stderr, job->qsw_job); + qsw_print_jobinfo(log_fp(), job->qsw_job); return SLURM_ERROR; } @@ -119,8 +100,17 @@ interconnect_init(slurmd_job_t *job) int interconnect_fini(slurmd_job_t *job) { + qsw_prog_fini(job->qsw_job); + return SLURM_SUCCESS; +} + +int +interconnect_postfini(slurmd_job_t *job) +{ + _wait_and_destroy_prg(job->qsw_job); return SLURM_SUCCESS; } + int interconnect_attach(slurmd_job_t *job, int procid) { diff --git a/src/slurmd/interconnect.h b/src/slurmd/interconnect.h index e9984cd8b4874609a8d2e690959f99745bbfcf96..c156fc7562a0948b3b086215cf87d4b1f0d5755f 100644 --- a/src/slurmd/interconnect.h +++ b/src/slurmd/interconnect.h @@ -32,23 +32,74 @@ #include "src/common/slurm_protocol_api.h" #include "src/slurmd/job.h" +/* + * Notes: + * + * Interconnect functions are run within slurmd in the following way: + * (Diagram courtesy of Jim Garlick [see qsw.c] ) + * + * Process 1 (root) Process 2 (root, user) | Process 3 (user task) + * | + * interconnect_preinit | + * fork ------------------ interconnect_init | + * waitpid setuid, chdir, etc. | + * fork N procs -----------+--- interconnect_attach + * wait all | interconnect_env + * | exec mpi process + * interconnect_fini* | + * interconnect_postfini | + * | + * + * [ *Note: interconnect_fini() is run as the uid of the job owner, not root ] + */ + + +/* + * Prepare node for job. + * + * pre is run as root in the first slurmd process, the so called job + * manager. This function can be used to perform any initialization + * that needs to be performed in the same process as interconnect_fini() + * + */ +int interconnect_preinit(slurmd_job_t *job); + /* - * initialize interconnect on node + * initialize interconnect on node. This function is run from the + * 2nd slurmd process (some interconnect implementations may require + * interconnect init functions to be executed from a separate process + * than the process executing initerconnect_fini() [e.g. QsNet]) + * */ int interconnect_init(slurmd_job_t *job); /* - * finalize and detach from interconnect on node + * This function is run from the same process as interconnect_init() + * after all job tasks have exited. It is *not* run as root, because + * the process in question has already setuid to the job owner. + * */ int interconnect_fini(slurmd_job_t *job); +/* + * Finalize interconnect on node. + * + * This function is run from the initial slurmd process (same process + * as interconnect_preinit()), and is run as root. Any cleanup routines + * that need to be run with root privileges should be run from this + * function. + */ +int interconnect_postfini(slurmd_job_t *job); + /* * attach process to interconnect + * */ int interconnect_attach(slurmd_job_t *job, int taskid); /* * Set environment variables needed. + * */ int interconnect_env(slurmd_job_t *job, int taskid); diff --git a/src/slurmd/io.c b/src/slurmd/io.c index 27511531d11cd600529776daf07184cdc849ace9..5d1ed247d457b947031ce6781548964982fd0050 100644 --- a/src/slurmd/io.c +++ b/src/slurmd/io.c @@ -445,6 +445,8 @@ _io_add_connecting(slurmd_job_t *job, task_info_t *t, srun_info_t *srun, io_obj_t *obj = NULL; int sock = -1; + debug3("in io_add_connecting"); + if ((sock = (int) slurm_open_stream(&srun->ioaddr)) < 0) { error("connect io: %m"); /* XXX retry or silently fail? @@ -470,6 +472,8 @@ _io_add_connecting(slurmd_job_t *job, task_info_t *t, srun_info_t *srun, list_append(job->objs, (void *)obj); + debug3("Now handling %d IO objects", list_count(job->objs)); + return SLURM_SUCCESS; } @@ -501,7 +505,7 @@ _io_prepare_one(slurmd_job_t *j, task_info_t *t, srun_info_t *s) } if (!list_find_first(t->srun_list, (ListFindF) find_obj, s)) { - debug("appending new client to srun_list for task %d", t->gid); + debug3("appending new client to srun_list for task %d", t->gid); list_append(t->srun_list, (void *) s); } @@ -543,7 +547,9 @@ io_prepare_clients(slurmd_job_t *job) return SLURM_FAILURE; /* kick IO thread */ - pthread_kill(job->ioid, SIGHUP); + debug3("sending sighup to io thread id %ld", job->ioid); + if (pthread_kill(job->ioid, SIGHUP) < 0) + error("pthread_kill: %m"); } return SLURM_SUCCESS; diff --git a/src/slurmd/job.c b/src/slurmd/job.c index dcfa6b9b2d6047fd53457b21728ee0432c4a0594..7209cca8ab7ede6c3690866da227ac0fea8474a3 100644 --- a/src/slurmd/job.c +++ b/src/slurmd/job.c @@ -153,6 +153,11 @@ job_create(launch_tasks_request_msg_t *msg, slurm_addr *cli_addr) _job_init_task_info(job, msg->global_task_ids); + if (pipe(job->fdpair) < 0) { + error("pipe: %m"); + return NULL; + } + return job; } @@ -209,6 +214,11 @@ job_batch_job_create(batch_job_launch_msg_t *msg) */ job->argv = (char **) xmalloc(job->argc * sizeof(char *)); + if (pipe(job->fdpair) < 0) { + error("pipe: %m"); + return NULL; + } + _job_init_task_info(job, &global_taskid); return job; diff --git a/src/slurmd/job.h b/src/slurmd/job.h index 7cbb03aa219e0f4279a4ce5a8b71c24da8384266..ba3d9614a50f111d820b352076fd5043ca4ecf97 100644 --- a/src/slurmd/job.h +++ b/src/slurmd/job.h @@ -84,31 +84,40 @@ typedef struct srun_info { } srun_info_t; typedef struct slurmd_job { - uint32_t jobid; - uint32_t stepid; - uint32_t nnodes; - uint32_t nprocs; - uint32_t nodeid; - uint32_t ntasks; - uint32_t debug; - uint16_t envc; - uint16_t argc; - bool batch; - bool run_prolog; /* need to run prolog */ - char **env; - char **argv; - char *cwd; + uint32_t jobid; /* Current SLURM job id */ + uint32_t stepid; /* Current step id (or NO_VAL) */ + uint32_t nnodes; /* number of nodes in current job */ + uint32_t nprocs; /* total number of processes in current job */ + uint32_t nodeid; /* relative position of this node in job */ + uint32_t ntasks; /* number of tasks on *this* node */ + uint32_t debug; /* debug level for job slurmd */ + uint16_t envc; /* Environment variable count */ + uint16_t argc; /* number of commandline arguments */ + char **env; /* job environment */ + char **argv; /* job argument vector */ + char *cwd; /* path to current working directory */ #ifdef HAVE_LIBELAN3 - qsw_jobinfo_t qsw_job; + qsw_jobinfo_t qsw_job; /* Elan-specific job information */ #endif - uid_t uid; - struct passwd *pwd; - time_t timelimit; - task_info_t **task; - List objs; - List sruns; - pthread_t ioid; - uint16_t task_flags; + uid_t uid; /* user id for job */ + + bool batch; /* true if this is a batch job */ + bool run_prolog; /* true if need to run prolog */ + time_t timelimit; /* time at which job must stop */ + + struct passwd *pwd; /* saved passwd struct for user job */ + task_info_t **task; /* list of task information pointers */ + List objs; /* list of IO objects */ + List sruns; /* List of sruns */ + pthread_t ioid; /* pthread id of IO thread */ + + pid_t jmgr_pid; /* job manager pid */ + pid_t smgr_pid; /* session manager pid */ + + int fdpair[2]; /* file descriptor pair for */ + /* communication between slurmds */ + + uint16_t task_flags; } slurmd_job_t; diff --git a/src/slurmd/mgr.c b/src/slurmd/mgr.c index 84e60fc5cc4c82965be49c5fce8604740fedc143..a412cbad52dc59c8210d16cc002aef2431c1ef85 100644 --- a/src/slurmd/mgr.c +++ b/src/slurmd/mgr.c @@ -57,90 +57,577 @@ #include "src/common/xstring.h" #include "src/common/xmalloc.h" +#include "src/slurmd/mgr.h" + #include "src/slurmd/slurmd.h" #include "src/slurmd/setenvpf.h" -#include "src/slurmd/mgr.h" +#include "src/slurmd/smgr.h" #include "src/slurmd/io.h" #include "src/slurmd/shm.h" #include "src/slurmd/interconnect.h" -static int _run_job(slurmd_job_t *job); -static int _exec_all_tasks(slurmd_job_t *job); -static void _task_exec(slurmd_job_t *job, int i); + +/* + * Map session manager exit status to slurm errno: + * Keep in sync with smgr.c exit codes. + */ +static int exit_errno[] = +{ 0, + ESLURM_INTERCONNECT_FAILURE, + ESLURMD_SET_UID_OR_GID_ERROR, + ESLURMD_SET_SID_ERROR, + ESCRIPT_CHDIR_FAILED, + -1, + ESLURMD_EXECVE_FAILED +}; + +#define MAX_SMGR_EXIT_STATUS 6 + + + +/* + * Prototypes + */ + +/* + * Job manager related prototypes + */ +static int _job_mgr(slurmd_job_t *job); +static int _setup_io(slurmd_job_t *job); static int _drop_privileges(struct passwd *pwd); static int _reclaim_privileges(struct passwd *pwd); -static int _become_user(slurmd_job_t *job); -static int _unblock_all_signals(void); static int _block_most_signals(void); -static int _send_exit_msg(int rc, task_info_t *t); -static int _complete_job(slurmd_job_t *job, int rc, int status); static void _send_launch_resp(slurmd_job_t *job, int rc); -static void _wait_for_all_tasks(slurmd_job_t *job); static void _slurmd_job_log_init(slurmd_job_t *job); +static int _update_shm_task_info(slurmd_job_t *job); +static int _readn(int fd, void *buf, size_t nbytes); +static int _create_job_session(slurmd_job_t *job); +static int _wait_for_task_exit(slurmd_job_t *job); +static int _wait_for_session(slurmd_job_t *job); +static void _wait_for_io(slurmd_job_t *job); +static void _handle_attach_req(slurmd_job_t *job); +static int _send_exit_msg(slurmd_job_t *job, int tid[], int n, int status); + +static void _setargs(slurmd_job_t *job, char **argv, int argc); + +/* + * Batch job mangement prototypes: + */ +static char * _make_batch_dir(slurmd_job_t *job); +static char * _make_batch_script(batch_job_launch_msg_t *msg, char *path); +static int _setup_batch_env(slurmd_job_t *job, char *nodes); +static int _complete_job(slurmd_job_t *job, int err, int status); -static void -_setargs(slurmd_job_t *job, char **argv, int argc) + +/* SIGHUP (empty) signal handler + */ +static void _hup_handler(int sig) {;} + +/* + * Launch an job step on the current node + */ +int +mgr_launch_tasks(launch_tasks_request_msg_t *msg, slurm_addr *cli) { - int i; - size_t len = 0; - char *arg = NULL; + slurmd_job_t *job = NULL; + char buf[256]; - for (i = 0; i < argc; i++) - len += strlen(argv[i]) + 1; + snprintf(buf, sizeof(buf), "[%d.%d]", msg->job_id, msg->job_step_id); + log_set_fpfx(buf); - if (job->stepid == NO_VAL) - xstrfmtcat(arg, "[%d]", job->jobid); - else - xstrfmtcat(arg, "[%d.%d]", job->jobid, job->stepid); + if (!(job = job_create(msg, cli))) + return SLURM_ERROR; - if (len < (strlen(arg) + 7)) - goto done; + _setargs(job, *conf->argv, *conf->argc); - memset(argv[0], 0, len); - strncpy(argv[0], "slurmd", 6); - strncpy((*argv)+7, arg, strlen(arg)); + if (_job_mgr(job) < 0) + return SLURM_ERROR; - done: - xfree(arg); - return; + return SLURM_SUCCESS; } -/* Launch a job step on this node +/* + * Launch a batch job script on the current node */ int -mgr_launch_tasks(launch_tasks_request_msg_t *msg, slurm_addr *cli) +mgr_launch_batch_job(batch_job_launch_msg_t *msg, slurm_addr *cli) { + int rc = 0; + int status = 0; slurmd_job_t *job; - char buf[256]; + char *batchdir; + char buf[256]; - snprintf(buf, sizeof(buf), "[%d.%d]", msg->job_id, msg->job_step_id); + snprintf(buf, sizeof(buf), "[%d]", msg->job_id); log_set_fpfx(buf); - /* New process, so we must reinit shm */ - if (shm_init() < 0) - goto error; - - if (!(job = job_create(msg, cli))) - goto error; + if (!(job = job_batch_job_create(msg))) + goto cleanup; _setargs(job, *conf->argv, *conf->argc); - verbose("running job step %d.%d for %s", - job->jobid, job->stepid, job->pwd->pw_name); + if ((batchdir = _make_batch_dir(job)) == NULL) + goto cleanup1; + + xfree(job->argv[0]); + + if ((job->argv[0] = _make_batch_script(msg, batchdir)) == NULL) + goto cleanup2; + + if ((rc = _setup_batch_env(job, msg->nodes)) < 0) + goto cleanup2; + + status = _job_mgr(job); + + cleanup2: + if (job->argv[0] && (unlink(job->argv[0]) < 0)) + error("unlink(%s): %m", job->argv[0]); + cleanup1: + if (batchdir && (rmdir(batchdir) < 0)) + error("rmdir(%s): %m", batchdir); + xfree(batchdir); + cleanup : + verbose("job %d completed with slurm_rc = %d, job_rc = %d", + job->jobid, rc, status); + _complete_job(job, rc, status); + return 0; +} + + + +/* + * Run a prolog or epilog script. + * returns -1 on failure. + * + */ +int +run_script(bool prolog, const char *path, uint32_t jobid, uid_t uid) +{ + int status; + pid_t cpid; + char *name = prolog ? "prolog" : "epilog"; + + if (path == NULL || path[0] == '\0') + return 0; + + debug("[job %d] attempting to run %s [%s]", jobid, name, path); + + if (access(path, R_OK | X_OK) < 0) { + debug("Not running %s [%s]: %m", name, path); + return 0; + } + + if ((cpid = fork()) < 0) { + error ("executing %s: fork: %m", name); + return -1; + } + if (cpid == 0) { + char *argv[4]; + char **env; + int envc = 0; + - /* Run job's tasks and wait for all tasks to exit. + env = xmalloc(sizeof(char *)); + + argv[0] = xstrdup(path); + argv[1] = NULL; + + env[0] = NULL; + setenvpf(&env, &envc, "SLURM_JOBID=%u", jobid); + setenvpf(&env, &envc, "SLURM_UID=%u", uid); + + execve(path, argv, env); + error("help! %m"); + exit(127); + } + + do { + if (waitpid(cpid, &status, 0) < 0) { + if (errno != EINTR) + return -1; + } else + return status; + } while(1); + + /* NOTREACHED */ +} + + +static int +_setup_io(slurmd_job_t *job) +{ + int rc = 0; + struct passwd *spwd = NULL; + + /* + * Save current UID/GID */ - if (_run_job(job) < 0) - goto error; + if (!(spwd = getpwuid(geteuid()))) { + error("getpwuid: %m"); + return ESLURMD_IO_ERROR; + } - debug2("%ld returned from slurmd_run_job()", getpid()); - shm_fini(); - return(SLURM_SUCCESS); - error: + if (io_spawn_handler(job) < 0) + return ESLURMD_IO_ERROR; + + /* + * Initialize log facility to copy errors back to srun + */ + _slurmd_job_log_init(job); + + /* + * Temporarily drop permissions, initialize IO clients + * (open files/connections for IO, etc), then reclaim privileges. + */ + if (_drop_privileges(job->pwd) < 0) + return ESLURMD_SET_UID_OR_GID_ERROR; + + rc = io_prepare_clients(job); + + if (_reclaim_privileges(spwd) < 0) + error("sete{u/g}id(%ld/%ld): %m", spwd->pw_uid, spwd->pw_gid); + + if (rc < 0) + return ESLURMD_IO_ERROR; + + return SLURM_SUCCESS; +} + + +/* + * Send task exit message for n tasks. tid is the list of _local_ + * task ids that have exited + */ +static int +_send_exit_msg(slurmd_job_t *job, int tid[], int n, int status) +{ + int j; + slurm_msg_t resp; + task_exit_msg_t msg; + uint32_t gid[n]; + ListIterator i = NULL; + srun_info_t *srun = NULL; + + debug3("sending task exit msg for %d tasks", n); + + for (j = 0; j < n; j++) + gid[j] = job->task[tid[j]]->gid; + + msg.task_id_list = gid; + msg.num_tasks = n; + msg.return_code = status; + resp.data = &msg; + resp.msg_type = MESSAGE_TASK_EXIT; + + /* + * XXX: Should srun_list be associated with each task? + */ + i = list_iterator_create(job->task[tid[0]]->srun_list); + while ((srun = list_next(i))) { + resp.address = srun->resp_addr; + if (resp.address.sin_family != 0) + slurm_send_only_node_msg(&resp); + } + list_iterator_destroy(i); + + return SLURM_SUCCESS; +} + + +/* + * Executes the functions of the slurmd job manager process, + * which runs as root and performs shared memory and interconnect + * initialization, etc. + * + * Returns 0 if job ran and completed successfully. + * Returns errno if job startup failed. + * + */ +static int +_job_mgr(slurmd_job_t *job) +{ + int rc = 0; + + debug3("Entered job_mgr"); + + if (shm_init() < 0) + goto fail0; + + job_update_shm(job); + + if (!job->batch && (interconnect_preinit(job) < 0)) { + rc = ESLURM_INTERCONNECT_FAILURE; + goto fail1; + } + + _block_most_signals(); + + if ((rc = _setup_io(job))) + goto fail1; + + xsignal(SIGHUP, _hup_handler); + + /* + * Create slurmd session manager and read task pids from pipe + */ + if ((rc = _create_job_session(job))) { + /* + * Get exit code from session manager + */ + if (rc < 0) + rc = _wait_for_session(job); + goto fail2; + } + + /* + * Send job launch response with list of pids + */ + if (!job->batch) + _send_launch_resp(job, 0); + + /* + * Wait for all tasks to exit + */ + _wait_for_task_exit(job); + + /* wait for session to terminate, + * then clean up + */ + _wait_for_session(job); + + fail2: + /* + * Wait for io thread to complete + */ + _wait_for_io(job); + + if (!job->batch && (interconnect_postfini(job) < 0)) + error("interconnect_postfini: %m"); + fail1: + job_delete_shm(job); shm_fini(); - return(SLURM_ERROR); + fail0: + /* If interactive job startup was abnormal, + * be sure to notify client. + */ + if ((rc != 0) && !job->batch) + _send_launch_resp(job, rc); + + return(rc); +} + +/* + * update task information from "job" into shared memory + */ +static int +_update_shm_task_info(slurmd_job_t *job) +{ + int retval = SLURM_SUCCESS; + int i; + + for (i = 0; i < job->ntasks; i++) { + task_t t; + + t.id = i; + t.global_id = job->task[i]->gid; + t.pid = job->task[i]->pid; + t.ppid = job->smgr_pid; + + if (shm_add_task(job->jobid, job->stepid, &t) < 0) { + error("shm_add_task: %m"); + retval = SLURM_ERROR; + } + } + + return retval; +} + +static int +_readn(int fd, void *buf, size_t nbytes) +{ + int n = 0; + char *pbuf = (char *) buf; + size_t nleft = nbytes; + + while (nleft > 0) { + if ((n = read(fd, (void *) pbuf, nleft)) > 0) { + pbuf+=n; + nleft-=n; + } else if (n == 0) /* EOF */ + break; + else if (errno == EINTR) + break; + else { + debug("read: %m"); + break; + } + } + return(n); +} + + +static int +_create_job_session(slurmd_job_t *job) +{ + int i; + int rc = 0; + int fd = job->fdpair[0]; + pid_t spid; + + if ((spid = smgr_create(job)) < (pid_t) 0) { + error("Unable to create session manager: %m"); + return ESLURMD_FORK_FAILED; + } + + job->jmgr_pid = getpid(); + if (shm_update_step_mpid(job->jobid, job->stepid, getpid()) < 0) + error("shm_update_step_mpid: %m"); + + job->smgr_pid = spid; + if (shm_update_step_sid(job->jobid, job->stepid, spid) < 0) + error("shm_update_step_sid: %m"); + + /* + * Read information from session manager slurmd + */ + for (i = 0; i < job->ntasks; i++) { + pid_t *pidptr = &job->task[i]->pid; + + if ((rc = _readn(fd, (void *) pidptr, sizeof(pid_t))) < 0) + error("Error obtaining task information: %m"); + + if (rc == 0) /* EOF, smgr must've died */ + goto error; + } + + _update_shm_task_info(job); + + return SLURM_SUCCESS; + + error: + rc = _wait_for_session(job); + return rc; +} + +static int +_handle_task_exit(slurmd_job_t *job) +{ + int len; + int tid[1]; + exit_status_t e; + + if ((len = _readn(job->fdpair[0], &e, sizeof(e))) < 0) { + error("read from session mgr: %m"); + return SLURM_ERROR; + } + + if (len == 0) /* EOF */ + return len; + + tid[0] = e.taskid; + + debug2("global task %d exited with status %d", tid[0], e.status); + + _send_exit_msg(job, tid, 1, e.status); + + return SLURM_SUCCESS; +} + +/* + * Wait for tasks to exit by reading task exit codes from slurmd + * session manager pipe. On EOF or when waiting == 0, the job is + * complete + */ +static int +_wait_for_task_exit(slurmd_job_t *job) +{ + int rc = 0; + int waiting = job->ntasks; + struct pollfd pfd[1]; + + pfd[0].fd = job->fdpair[0]; + pfd[0].events = POLLIN; + + while (waiting > 0) { + int revents; + + if ((rc = poll(pfd, 1, -1)) < 0) { + if (errno == EINTR) { + _handle_attach_req(job); + continue; + } + } + + revents = pfd[0].revents; + + if (revents & POLLNVAL) + return SLURM_ERROR; + + if ( (revents & POLLERR) + || (revents & POLLHUP) ) { + /* + * smgr exited. XXX: Needs work + */ + while (waiting && (_handle_task_exit(job) == 0)) { + waiting--; + } + if (waiting != 0) + return SLURM_ERROR; + else + return SLURM_SUCCESS; + } + + if ((revents & POLLIN) + && (_handle_task_exit(job) == SLURM_SUCCESS)) + waiting--; + } + + return SLURM_SUCCESS; +} + + +/* + * read task exit status from slurmd session manager process, + * then wait for session manager to terminate + */ +static int +_wait_for_session(slurmd_job_t *job) +{ + int status = -1; + pid_t pid; + + while ((pid = waitpid(job->smgr_pid, &status, 0)) < (pid_t) 0) { + if (errno == EINTR) + _handle_attach_req(job); + else { + error("waitpid: %m"); + break; + } + } + + status = WEXITSTATUS(status); + + return (status < MAX_SMGR_EXIT_STATUS) ? exit_errno[status] : status; } +/* + * Wait for IO + */ +static void +_wait_for_io(slurmd_job_t *job) +{ + debug("Waiting for IO"); + io_close_all(job); + + /* + * Wait until IO thread exits + */ + pthread_join(job->ioid, NULL); + + return; +} + + static char * _make_batch_dir(slurmd_job_t *job) { @@ -235,149 +722,35 @@ _setup_batch_env(slurmd_job_t *job, char *nodes) } -int -mgr_launch_batch_job(batch_job_launch_msg_t *msg, slurm_addr *cli) -{ - int rc = 0; - int status = 0; - slurmd_job_t *job; - char *batchdir; - char buf[256]; - - snprintf(buf, sizeof(buf), "[%d]", msg->job_id); - log_set_fpfx(buf); - - /* New process, so must reinit shm */ - if ((rc = shm_init()) < 0) - goto cleanup1; - - if (!(job = job_batch_job_create(msg))) - goto cleanup2; - - /* - * This is now done in _run_job() - */ - /* job_update_shm(job); */ - - _setargs(job, *conf->argv, *conf->argc); - - if ((batchdir = _make_batch_dir(job)) == NULL) - goto cleanup2; - - xfree(job->argv[0]); - - if ((job->argv[0] = _make_batch_script(msg, batchdir)) == NULL) - goto cleanup3; - - if ((rc = _setup_batch_env(job, msg->nodes)) < 0) - goto cleanup; - - status = _run_job(job); - - cleanup: - if (job->argv[0] && (unlink(job->argv[0]) < 0)) - error("unlink(%s): %m", job->argv[0]); - cleanup3: - if (batchdir && (rmdir(batchdir) < 0)) - error("rmdir(%s): %m", batchdir); - xfree(batchdir); - cleanup2: - shm_delete_step(job->jobid, job->stepid); - shm_fini(); - cleanup1: - verbose("job %d completed with slurm_rc = %d, job_rc = %d", - job->jobid, rc, status); - _complete_job(job, rc, status); - return 0; -} - -/* Instance of a slurmd "job" or job step: - * We run: - * interconnect_prepare() : prepare node for interconnect (if any) - * interconnect_init() : initialize interconnect on node - * fork() N tasks --> wait() --> interconnect_fini() - * \ - * `--> interconnect_attach() : attach each proc to interconnect - * interconnect_env() : setup child environment - * exec() - */ -static int -_run_job(slurmd_job_t *job) -{ - int rc = SLURM_SUCCESS; - struct passwd *spwd = getpwuid(geteuid()); - - _block_most_signals(); - - /* Insert job info into shared memory */ - job_update_shm(job); - - if (!job->batch && interconnect_init(job) == SLURM_ERROR) { - error("interconnect_init: %m"); - rc = errno; - goto fail; - } - - if ((rc = io_spawn_handler(job)) < 0) { - rc = ESLURMD_IO_ERROR; - goto fail1; - } - - /* connect job stderr to this node's task 0 stderr so - * user recieves error messages on stderr - */ - _slurmd_job_log_init(job); - - /* - * Temporarily drop permissions - */ - if ((rc = _drop_privileges(job->pwd)) < 0) { - rc = ESLURMD_SET_UID_OR_GID_ERROR; - goto fail2; - } - - /* Open input/output files and/or connections back to client - */ - rc = io_prepare_clients(job); - - if (_reclaim_privileges(spwd) < 0) - error("sete{u/g}id(%ld/%ld): %m", spwd->pw_uid, spwd->pw_gid); +static void +_send_launch_resp(slurmd_job_t *job, int rc) +{ + int i; + slurm_msg_t resp_msg; + launch_tasks_response_msg_t resp; + srun_info_t *srun = list_peek(job->sruns); + debug("Sending launch resp rc=%d", rc); - if (rc < 0) { - rc = ESLURMD_IO_ERROR; - goto fail2; - } + resp_msg.address = srun->resp_addr; + resp_msg.data = &resp; + resp_msg.msg_type = RESPONSE_LAUNCH_TASKS; - rc = _exec_all_tasks(job); - if (!job->batch) - _send_launch_resp(job, rc); - _wait_for_all_tasks(job); + resp.node_name = conf->hostname; + resp.srun_node_id = job->nodeid; + resp.return_code = rc; + resp.count_of_pids = job->ntasks; - debug2("all tasks exited, waiting on IO"); - io_close_all(job); - pthread_join(job->ioid, NULL); - debug2("IO complete"); + resp.local_pids = xmalloc(job->ntasks * sizeof(*resp.local_pids)); + for (i = 0; i < job->ntasks; i++) + resp.local_pids[i] = job->task[i]->pid; - if (!job->batch) - interconnect_fini(job); /* ignore errors */ - job_delete_shm(job); /* again, ignore errors */ - verbose("job completed, rc = %d", rc); - return rc; + slurm_send_only_node_msg(&resp_msg); -fail2: - io_close_all(job); - pthread_join(job->ioid, NULL); -fail1: - if (!job->batch) - interconnect_fini(job); -fail: - job_delete_shm(job); - if (!job->batch) - _send_launch_resp(job, rc); - return rc; + xfree(resp.local_pids); } + static int _complete_job(slurmd_job_t *job, int err, int status) { @@ -434,6 +807,8 @@ _complete_job(slurmd_job_t *job, int err, int status) return SLURM_SUCCESS; } + + static void _handle_attach_req(slurmd_job_t *job) { @@ -456,37 +831,6 @@ _handle_attach_req(slurmd_job_t *job) io_new_clients(job); } -static void -_hup_handler(int sig) {;} - -static void -_wait_for_all_tasks(slurmd_job_t *job) -{ - int waiting = job->ntasks; - int i; - - xsignal(SIGHUP, _hup_handler); - - while (waiting > 0) { - int status; - pid_t pid = waitpid(0, &status, 0); - if ((pid < (pid_t) 0)) { - if (errno == EINTR) { - _handle_attach_req(job); - continue; - } - error("waitpid: %m"); - /* job_cleanup() */ - } - for (i = 0; i < job->ntasks; i++) { - if (job->task[i]->pid == pid) { - _send_exit_msg(status, job->task[i]); - waiting--; - } - } - } - return; -} static int _drop_privileges(struct passwd *pwd) @@ -542,204 +886,6 @@ _reclaim_privileges(struct passwd *pwd) } - - - -static int -_become_user(slurmd_job_t *job) -{ - if (setgid(job->pwd->pw_gid) < 0) { - error("setgid: %m"); - return -1; - } - - if (initgroups(job->pwd->pw_name, job->pwd->pw_gid) < 0) { - ; - /* error("initgroups: %m"); */ - } - - if (setuid(job->pwd->pw_uid) < 0) { - error("setuid: %m"); - return -1; - } - - return 0; -} - -static void -_task_exec(slurmd_job_t *job, int i) -{ - int rc; - log_options_t opts = LOG_OPTS_STDERR_ONLY; - - io_prepare_child(job->task[i]); - - /* - * Reinitialize slurm log facility to send errors back to client - */ - log_init("slurmd", opts, 0, NULL); - - if ((rc = _become_user(job)) < 0) - exit(rc); - - if (_unblock_all_signals() == SLURM_ERROR) { - error("unable to unblock signals"); - exit(1); - } - - /* attach to interconnect */ - if (!job->batch && (interconnect_attach(job, i) < 0)) { - error("interconnect attach failed: %m"); - exit(1); - } - - if (!job->batch && (interconnect_env(job, i) < 0)) { - error("interconnect_env: %m"); - } - - if (chdir(job->cwd) < 0) { - error("couldn't chdir to `%s': %m: going to /tmp instead", - job->cwd); - if (chdir("/tmp") < 0) { - error("couldn't chdir to /tmp either. dying."); - exit(1); - } - } - -#ifdef HAVE_TOTALVIEW - /* Stop the tasks on exec for TotalView to connect */ - if ((job->task_flags & TASK_TOTALVIEW_DEBUG) && - (ptrace(PTRACE_TRACEME, 0, NULL, NULL) == -1)) - error("ptrace: %m"); -#endif - - /* exec the cmdline */ - execve(job->argv[0], job->argv, job->env); - - /* error and clean up if execve() returns: - */ - error("execve(): %s: %m", job->argv[0]); - exit(errno); -} - -static int -_exec_all_tasks(slurmd_job_t *job) -{ - pid_t sid; - int i; - - debug3("%ld entered _launch_tasks", getpid()); - - xsignal(SIGPIPE, SIG_IGN); - - if ((sid = setsid()) < (pid_t) 0) { - error("setsid: %m"); - } - - _block_most_signals(); - - if (shm_update_step_sid(job->jobid, job->stepid, sid) < 0) - error("shm_update_step_sid: %m"); - - debug2("invoking %d tasks", job->ntasks); - - for (i = 0; i < job->ntasks; i++) { - task_t t; - debug2("going to fork task %d", i); - t.id = i; - t.global_id = job->task[i]->gid; - t.ppid = getpid(); - - if ((t.pid = fork()) < 0) { - error("fork: %m"); - return 1; - /* job_cleanup() */ - } else if (t.pid == 0) /* child */ - break; - - /* Parent continues loop: */ - - job->task[i]->pid = t.pid; - - debug2("%ld: forked child process %ld for task %d", - getpid(), (long) t.pid, i); - debug2("going to add task %d to shm", i); - if (shm_add_task(job->jobid, job->stepid, &t) < 0) - error("shm_add_task: %m"); - debug2("task %d added to shm", i); -#ifdef HAVE_TOTALVIEW - /* If task to be debugged, wait for it to stop via - * child's ptrace(PTRACE_TRACEME), then SIGSTOP, and - * ptrace(PTRACE_DETACH). This requires a kernel patch, - * which you probably already have in place for TotalView: - * http://hypermail.idiosynkrasia.net/linux-kernel/ - * archived/2001/week51/1193.html */ - if (job->task_flags & TASK_TOTALVIEW_DEBUG) { - int status; - waitpid(t.pid, &status, WUNTRACED); - if (kill(t.pid, SIGSTOP)) - error("kill %ld: %m", (long) t.pid); - if (ptrace(PTRACE_DETACH, (long) t.pid, NULL, NULL)) - error("ptrace %ld: %m", (long) t.pid); - } -#endif - - } - - if (i == job->ntasks) - return 0; /* _wait_for_all_tasks(job); */ - else - _task_exec(job, i); - - debug3("All tasks exited"); - return 0; -} - -static int -_send_exit_msg(int rc, task_info_t *t) -{ - slurm_msg_t resp; - task_exit_msg_t msg; - uint32_t task_id_list[1]; - ListIterator i; - srun_info_t *srun; - - debug3("sending task exit msg for %d", t->gid); - - /* FIXME:XXX: attempt to combine task IDs in single message */ - task_id_list[0] = t->gid; - msg.task_id_list = task_id_list; - msg.num_tasks = 1; - msg.return_code = rc; - resp.data = &msg; - resp.msg_type = MESSAGE_TASK_EXIT; - - i = list_iterator_create(t->srun_list); - while ((srun = list_next(i))) { - resp.address = srun->resp_addr; - if (resp.address.sin_family != 0) - slurm_send_only_node_msg(&resp); - } - list_iterator_destroy(i); - - return SLURM_SUCCESS; -} - -static int -_unblock_all_signals(void) -{ - sigset_t set; - if (sigfillset(&set)) { - error("sigfillset: %m"); - return SLURM_ERROR; - } - if (sigprocmask(SIG_UNBLOCK, &set, NULL)) { - error("sigprocmask: %m"); - return SLURM_ERROR; - } - return SLURM_SUCCESS; -} - static int _block_most_signals(void) { @@ -762,33 +908,6 @@ _block_most_signals(void) return SLURM_SUCCESS; } -static void -_send_launch_resp(slurmd_job_t *job, int rc) -{ - int i; - slurm_msg_t resp_msg; - launch_tasks_response_msg_t resp; - srun_info_t *srun = list_peek(job->sruns); - - debug("Sending launch resp rc=%d", rc); - - resp_msg.address = srun->resp_addr; - resp_msg.data = &resp; - resp_msg.msg_type = RESPONSE_LAUNCH_TASKS; - - resp.node_name = conf->hostname; - resp.srun_node_id = job->nodeid; - resp.return_code = rc; - resp.count_of_pids = job->ntasks; - - resp.local_pids = xmalloc(job->ntasks * sizeof(*resp.local_pids)); - for (i = 0; i < job->ntasks; i++) - resp.local_pids[i] = job->task[i]->pid; - - slurm_send_only_node_msg(&resp_msg); - - xfree(resp.local_pids); -} static void _slurmd_job_log_init(slurmd_job_t *job) @@ -819,54 +938,36 @@ _slurmd_job_log_init(slurmd_job_t *job) log_init(argv0, conf->log_opts, 0, NULL); } -int -run_script(bool prolog, const char *path, uint32_t jobid, uid_t uid) -{ - int status; - pid_t cpid; - char *name = prolog ? "prolog" : "epilog"; - - if (path == NULL || path[0] == '\0') - return 0; - - debug("[job %d] attempting to run %s [%s]", jobid, name, path); - - if (access(path, R_OK | X_OK) < 0) { - debug("Not running %s [%s]: %m", name, path); - return 0; - } - if ((cpid = fork()) < 0) { - error ("executing %s: fork: %m", name); - return -1; - } - if (cpid == 0) { - char *argv[4]; - char **env; - int envc = 0; +/* + * Attempt to change the cmdline argument list for slurmd + * to denote the job/job step that this process is managing. + */ +static void +_setargs(slurmd_job_t *job, char **argv, int argc) +{ + int i; + size_t len = 0; + char *arg = NULL; + for (i = 0; i < argc; i++) + len += strlen(argv[i]) + 1; - env = xmalloc(sizeof(char *)); + if (job->stepid == NO_VAL) + xstrfmtcat(arg, "[%d]", job->jobid); + else + xstrfmtcat(arg, "[%d.%d]", job->jobid, job->stepid); - argv[0] = xstrdup(path); - argv[1] = NULL; + if (len < (strlen(arg) + 7)) + goto done; - env[0] = NULL; - setenvpf(&env, &envc, "SLURM_JOBID=%u", jobid); - setenvpf(&env, &envc, "SLURM_UID=%u", uid); + memset(argv[0], 0, len); + strncpy(argv[0], "slurmd", 6); + strncpy((*argv)+7, arg, strlen(arg)); - execve(path, argv, env); - error("help! %m"); - exit(127); - } + done: + xfree(arg); + return; +} - do { - if (waitpid(cpid, &status, 0) < 0) { - if (errno != EINTR) - return -1; - } else - return status; - } while(1); - /* NOTREACHED */ -} diff --git a/src/slurmd/no_interconnect.c b/src/slurmd/no_interconnect.c index 0102a698bb61e1403505c6466c95acc9ef0a74a1..45815b3fbd035f6233261b74edd8c2c73ede9b7a 100644 --- a/src/slurmd/no_interconnect.c +++ b/src/slurmd/no_interconnect.c @@ -28,7 +28,10 @@ #include <src/slurmd/interconnect.h> #include <src/slurmd/setenvpf.h> -#include "src/slurmd/shm.h" +int interconnect_preinit (slurmd_job_t *job) +{ + return SLURM_SUCCESS; +} int interconnect_init (slurmd_job_t *job) { @@ -43,7 +46,7 @@ int interconnect_attach (slurmd_job_t *job, int taskid) /* * Set env variables needed for this interconnect */ -int interconnect_env(slurmd_job_t *job, int taskid) +int interconnect_env (slurmd_job_t *job, int taskid) { int cnt = job->envc; task_info_t *t = job->task[taskid]; @@ -60,7 +63,12 @@ int interconnect_env(slurmd_job_t *job, int taskid) return SLURM_SUCCESS; } -int interconnect_fini(slurmd_job_t *job) +int interconnect_fini (slurmd_job_t *job) +{ + return SLURM_SUCCESS; +} + +int interconnect_postfini (slurmd_job_t *job) { return SLURM_SUCCESS; } diff --git a/src/slurmd/req.c b/src/slurmd/req.c index e1f187b6bafba812d3eaedf0d503d1c5fcae00e6..37b979de0dca10237d7593de388ce356f939eb58 100644 --- a/src/slurmd/req.c +++ b/src/slurmd/req.c @@ -185,7 +185,7 @@ _launch_tasks(launch_tasks_request_msg_t *req, slurm_addr *cli) break; default: debug("created process %ld for job %d.%d", - pid, req->job_id, req->job_step_id); + pid, req->job_id, req->job_step_id); break; } @@ -336,7 +336,7 @@ _rpc_kill_tasks(slurm_msg_t *msg, slurm_addr *cli_addr) if (!(step = shm_get_step(req->job_id, req->job_step_id))) { debug("kill for nonexistent job %d.%d requested", - req->job_id, req->job_step_id); + req->job_id, req->job_step_id); rc = ESLURM_INVALID_JOB_ID; goto done; } @@ -436,7 +436,7 @@ _rpc_reattach_tasks(slurm_msg_t *msg, slurm_addr *cli) memcpy(&resp_msg.address, cli, sizeof(slurm_addr)); slurm_set_addr(&resp_msg.address, req->resp_port, NULL); - if ((step = shm_get_step(req->job_id, req->job_step_id)) < 0) { + if (!(step = shm_get_step(req->job_id, req->job_step_id))) { rc = ESRCH; goto done; } diff --git a/src/slurmd/shm.c b/src/slurmd/shm.c index cd92ae3ce549466ee24610cb17ce9a0b2c90d5e4..11c3c2a375b5411c4889e6cec9f91a9983b28703 100644 --- a/src/slurmd/shm.c +++ b/src/slurmd/shm.c @@ -79,7 +79,7 @@ #define SHM_LOCKNAME "/.slurm.lock" /* Increment SHM_VERSION if format changes */ -#define SHM_VERSION 1004 +#define SHM_VERSION 1005 /* These macros convert shared memory pointers to local memory * pointers and back again. Pointers in shared memory are relative @@ -422,13 +422,18 @@ shm_signal_step(uint32_t jobid, uint32_t stepid, uint32_t signal) if ((i = _shm_find_step(jobid, stepid)) >= 0) { s = &slurmd_shm->step[i]; for (t = _taskp(s->task_list); t; t = _taskp(t->next)) { - if (getsid(t->pid) != s->sid) + + if (getsid(t->pid) != s->sid) { + error ("Task pid is not in my session!"); continue; + } + if (t->pid > 0 && kill(t->pid, signo) < 0) { error("kill %d.%d task %d pid %ld: %m", jobid, stepid, t->id, (long)t->pid); retval = errno; } + } } else retval = ESRCH; @@ -486,7 +491,6 @@ shm_get_step_owner(uint32_t jobid, uint32_t stepid) } - /* * Free a job step structure in local memory */ @@ -503,6 +507,21 @@ shm_free_step(job_step_t *step) } while ((t = p)); } +int +shm_update_step_mpid(uint32_t jobid, uint32_t stepid, int mpid) +{ + int i, retval = SLURM_SUCCESS; + _shm_lock(); + if ((i = _shm_find_step(jobid, stepid)) >= 0) + slurmd_shm->step[i].mpid = mpid; + else { + slurm_seterrno(ESRCH); + retval = SLURM_FAILURE; + } + _shm_unlock(); + return retval; +} + int shm_update_step_sid(uint32_t jobid, uint32_t stepid, int sid) { @@ -593,8 +612,8 @@ shm_update_step_addrs(uint32_t jobid, uint32_t stepid, s->io_update = true; debug3("Going to send shm update signal to %ld", - s->sid); - if (kill(s->sid, SIGHUP) < 0) { + s->mpid); + if ((s->mpid > 0) && (kill(s->mpid, SIGHUP) < 0)) { slurm_seterrno(EPERM); retval = SLURM_FAILURE; } @@ -617,8 +636,14 @@ shm_step_addrs(uint32_t jobid, uint32_t stepid, slurm_addr *ioaddr, slurm_addr *respaddr, srun_key_t *key) { int i, retval = SLURM_SUCCESS; - xassert(ioaddr != NULL); + + xassert(jobid >= 0); + xassert(stepid >= 0); + + xassert(ioaddr != NULL); xassert(respaddr != NULL); + xassert(key != NULL); + _shm_lock(); if ((i = _shm_find_step(jobid, stepid)) >= 0) { job_step_t *s = &slurmd_shm->step[i]; diff --git a/src/slurmd/shm.h b/src/slurmd/shm.h index ffe388a11568b5d112180cf0f0b44f343a31fa2b..1220543e8142de0f1c396e42e49e3e0b95e30eff 100644 --- a/src/slurmd/shm.h +++ b/src/slurmd/shm.h @@ -73,7 +73,7 @@ struct task { int global_id; /* global task id */ pid_t pid; /* pid of user process */ pid_t ppid; /* parent pid of user process */ - pid_t mpid; /* manager pid of this task */ + /* reverse pointer back to controlling job step */ job_step_t *job_step; task_t *next; /* next task in this job step */ @@ -83,20 +83,23 @@ struct job_step { uid_t uid; uint32_t jobid; uint32_t stepid; - uint32_t sw_id; /* Switch/Interconnect specific id */ - int ntasks; /* number of tasks in this job */ - pid_t sid; /* Job session id */ - char exec_name[MAXPATHLEN]; /* Executable's pathname */ + uint32_t sw_id; /* Switch/Interconnect specific id */ + int ntasks; /* number of tasks in this job */ + pid_t mpid; /* Job manager pid */ + pid_t sid; /* Job session id (smgr pid) */ + + /* Executable's pathname */ + char exec_name[MAXPATHLEN]; - int io_update; /* srun address has been updated */ - slurm_addr respaddr; /* Addr to send messages to srun on */ - slurm_addr ioaddr; /* Addr to connect to initialize IO */ - srun_key_t key; /* last key from srun client */ + int io_update; /* srun address has been updated */ + slurm_addr respaddr; /* Addr to send messages to srun on */ + slurm_addr ioaddr; /* Addr to connect to initialize IO */ + srun_key_t key; /* last key from srun client */ - job_state_t state; /* Job step status */ - time_t timelimit; /* job time limit */ - task_t *task_list; /* list of this step's tasks */ + job_state_t state; /* Job step status */ + time_t timelimit; /* job time limit */ + task_t *task_list; /* list of this step's tasks */ }; @@ -203,6 +206,11 @@ int shm_add_task(uint32_t jobid, uint32_t stepid, task_t *task); */ int shm_update_step_sid(uint32_t jobid, uint32_t stepid, int sid); +/* + * update job step "manager" pid + */ +int shm_update_step_mpid(uint32_t jobid, uint32_t stepid, int mpid); + /* * update job step state diff --git a/src/slurmd/slurmd.c b/src/slurmd/slurmd.c index b08459ba7dd73822d7f9d05ded2b197970fde373..9563e9c1a8b132c5f4fb6a544b6767f101d89b24 100644 --- a/src/slurmd/slurmd.c +++ b/src/slurmd/slurmd.c @@ -426,11 +426,13 @@ _read_config() path_pubkey = xstrdup(conf->cf.job_credential_public_certificate); + if (!conf->logfile) + conf->logfile = xstrdup(conf->cf.slurmd_logfile); + _free_and_set(&conf->epilog, xstrdup(conf->cf.epilog)); _free_and_set(&conf->prolog, xstrdup(conf->cf.prolog)); _free_and_set(&conf->tmpfs, xstrdup(conf->cf.tmp_fs)); _free_and_set(&conf->spooldir, xstrdup(conf->cf.slurmd_spooldir)); - _free_and_set(&conf->logfile, xstrdup(conf->cf.slurmd_logfile)); _free_and_set(&conf->pidfile, xstrdup(conf->cf.slurmd_pidfile)); _free_and_set(&conf->pubkey, path_pubkey); diff --git a/src/slurmd/smgr.c b/src/slurmd/smgr.c new file mode 100644 index 0000000000000000000000000000000000000000..16d934c9b5f11f8dd8d0475370d94a088910fd24 --- /dev/null +++ b/src/slurmd/smgr.c @@ -0,0 +1,410 @@ +/*****************************************************************************\ + * slurmd/smgr.c - session manager functions for slurmd + * $Id$ + ***************************************************************************** + * Copyright (C) 2002 The Regents of the University of California. + * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). + * Written by Mark A. Grondona <mgrondona@llnl.gov>. + * UCRL-CODE-2002-040. + * + * This file is part of SLURM, a resource management program. + * For details, see <http://www.llnl.gov/linux/slurm/>. + * + * SLURM is free software; you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the License, or (at your option) + * any later version. + * + * SLURM is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along + * with SLURM; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. +\*****************************************************************************/ + +#if HAVE_CONFIG_H +# include "config.h" +#endif + +#include <sys/wait.h> +#include <sys/stat.h> +#include <sys/param.h> +#include <sys/ptrace.h> +#include <unistd.h> +#include <pwd.h> +#include <grp.h> +#include <string.h> + +#if HAVE_STDLIB_H +# include <stdlib.h> +#endif + +#if HAVE_SYS_TYPES_H +# include <sys/types.h> +#endif + +#include <slurm/slurm_errno.h> + +#include "src/common/log.h" +#include "src/common/xsignal.h" + +#include "src/slurmd/smgr.h" +#include "src/slurmd/ulimits.h" +#include "src/slurmd/interconnect.h" +#include "src/slurmd/io.h" + +/* + * Static prototype definitions. + */ +static void _session_mgr(slurmd_job_t *job); +static int _exec_all_tasks(slurmd_job_t *job); +static void _exec_task(slurmd_job_t *job, int i); +static int _become_user(slurmd_job_t *job); +static void _wait_for_all_tasks(slurmd_job_t *job); +static int _send_exit_status(slurmd_job_t *job, int fd, int id, int status); +static int _writen(int fd, void *buf, size_t nbytes); +static int _unblock_all_signals(void); +static void _cleanup_file_descriptors(slurmd_job_t *job); + +/* parallel debugger support */ +static void _pdebug_trace_process(slurmd_job_t *job, pid_t pid); +static void _pdebug_stop_current(slurmd_job_t *job); + +/* + * Create the slurmd session manager process + */ +pid_t +smgr_create(slurmd_job_t *job) +{ + pid_t pid; + switch ((pid = fork())) { + case -1: + error("smgr_create: fork: %m"); + return pid; + break; + case 0: /* child */ + close(job->fdpair[0]); + _session_mgr(job); + /* NOTREACHED */ + break; + } + + /* parent continues here */ + + close(job->fdpair[1]); + + return pid; +} + +static void +_session_mgr(slurmd_job_t *job) +{ + xassert(job != NULL); + + /* _cleanup_file_descriptors(job); */ + + /* + * Call interconnect_init() before becoming user + */ + if (!job->batch && (interconnect_init(job) < 0)) { + error("interconnect_init: %m"); + exit(1); + } + + if (_become_user(job) < 0) + exit(2); + + if (setsid() < (pid_t) 0) { + error("setsid: %m"); + exit(3); + } + + if (chdir(job->cwd) < 0) { + error("couldn't chdir to `%s': %m: going to /tmp instead", + job->cwd); + if (chdir("/tmp") < 0) { + error("couldn't chdir to /tmp either. dying."); + exit(4); + } + } + + if (set_user_limits(job) < 0) { + debug("Unable to set user limits"); + exit(5); + } + + if (_exec_all_tasks(job) < 0) { + debug("exec_all_tasks failed"); + exit(6); + } + + _cleanup_file_descriptors(job); + + _wait_for_all_tasks(job); + + if (!job->batch && (interconnect_fini(job) < 0)) { + error("interconnect_fini: %m"); + exit(1); + } + + exit(SLURM_SUCCESS); +} + +/* Close write end of stdin (at the very least) + */ +static void +_cleanup_file_descriptors(slurmd_job_t *j) +{ + int i; + for (i = 0; i < j->ntasks; i++) { + close(j->task[i]->pin[1]); /* Ignore errors */ + close(j->task[i]->pout[0]); + + /* Leave stderr open for slurmd error logging + */ + } +} + +static int +_become_user(slurmd_job_t *job) +{ + if (setgid(job->pwd->pw_gid) < 0) { + error("setgid: %m"); + return -1; + } + + if (initgroups(job->pwd->pw_name, job->pwd->pw_gid) < 0) { + ; + /* error("initgroups: %m"); */ + } + + if (setuid(job->pwd->pw_uid) < 0) { + error("setuid: %m"); + return -1; + } + + return 0; +} + + +/* Execute N tasks and send pids back to job manager process. + */ +static int +_exec_all_tasks(slurmd_job_t *job) +{ + int i; + int fd = job->fdpair[1]; + + xassert(job != NULL); + xassert(fd >= 0); + + for (i = 0; i < job->ntasks; i++) { + pid_t pid = fork(); + + if (pid < 0) { + error("fork: %m"); + return SLURM_ERROR; + } else if (pid == 0) /* child */ + _exec_task(job, i); + + /* Parent continue: + */ + + debug2("pid %ld forked child process %ld for local task %d", + getpid(), (long) pid, i); + + /* + * Send pid to job manager + */ + if (_writen(fd, (char *)&pid, sizeof(pid_t)) < 0) { + error("unable to update task pid!: %m"); + return SLURM_ERROR; + } + + job->task[i]->pid = pid; + + /* + * Prepare process for attach by parallel debugger + * (if specified and able) + */ + _pdebug_trace_process(job, pid); + } + + return SLURM_SUCCESS; +} + +static void +_exec_task(slurmd_job_t *job, int i) +{ + log_options_t opts = LOG_OPTS_STDERR_ONLY; + + io_prepare_child(job->task[i]); + + /* + * Reinitialize slurm log facility to send errors back to client + */ + log_init("slurmd", opts, 0, NULL); + + if (_unblock_all_signals() < 0) { + error("unable to unblock signals"); + exit(1); + } + + if (!job->batch) { + if (interconnect_attach(job, i) < 0) { + error("Unable to attach to interconnect: %m"); + exit(1); + } + + if (interconnect_env(job, i) < 0) + error("error establishing env for interconnect: %m"); + + _pdebug_stop_current(job); + } + + execve(job->argv[0], job->argv, job->env); + + /* + * error() and clean up if execve() returns: + */ + error("execve(): %s: %m", job->argv[0]); + exit(errno); +} + + + +/* wait for N tasks to exit, reporting exit status back to slurmd mgr + * process over file descriptor fd. + * + */ +static void +_wait_for_all_tasks(slurmd_job_t *job) +{ + int waiting = job->ntasks; + int i = 0; + int id = 0; + int fd = job->fdpair[1]; + + while (waiting > 0) { + int status = 0; + pid_t pid; + + if ((pid = waitpid(0, &status, 0)) < (pid_t) 0) { + if (errno != EINTR) + error("waitpid: %m"); + continue; + } + + for (i = 0; i < job->ntasks; i++) { + if (job->task[i]->pid == pid) { + waiting--; + id = i; + break; + } + } + + _send_exit_status(job, fd, id, status); + status = 0; + } + return; +} + +static int +_send_exit_status(slurmd_job_t *job, int fd, int tid, int status) +{ + exit_status_t e; + int len; + + e.taskid = tid; + e.status = status; + + len = _writen(fd, &e, sizeof(e)); + + debug("task %d exited with status %d", tid, status); + + return len; +} + +/* + * Prepare task for parallel debugger attach + */ +static void +_pdebug_trace_process(slurmd_job_t *job, pid_t pid) +{ +#if HAVE_TOTALVIEW + /* If task to be debugged, wait for it to stop via + * child's ptrace(PTRACE_TRACEME), then SIGSTOP, and + * ptrace(PTRACE_DETACH). This requires a kernel patch, + * which you probably already have in place for TotalView: + * http://hypermail.idiosynkrasia.net + * /linux-kernel/archived/2001/week51/1193.html + */ + + if (job->task_flags & TASK_TOTALVIEW_DEBUG) { + int status; + waitpid(pid, &status, WUNTRACED); + if (kill(pid, SIGSTOP) < 0) + error("kill(%ld): %m", (long) pid); + if (ptrace(PTRACE_DETACH, (long) pid, NULL, NULL)) + error("ptrace(%ld): %m", (long) pid); + } +#endif /* HAVE_TOTALVIEW */ +} + +/* + * Stop current task on exec() for connection from a parallel debugger + */ +static void +_pdebug_stop_current(slurmd_job_t *job) +{ +#if HAVE_TOTALVIEW + /* + * Stop the task on exec for TotalView to connect + */ + if ( (job->task_flags & TASK_TOTALVIEW_DEBUG) + && (ptrace(PTRACE_TRACEME, 0, NULL, NULL) < 0) ) + error("ptrace: %m"); +#endif +} + + +static int +_writen(int fd, void *buf, size_t nbytes) +{ + int n = 0; + char *pbuf = (char *) buf; + size_t nleft = nbytes; + + while (nleft > 0) { + if ((n = write(fd, (void *) pbuf, nleft)) >= 0) { + pbuf+=n; + nleft-=n; + } else if (errno == EINTR) + continue; + else { + debug("write: %m"); + break; + } + } + return(n); +} + +static int +_unblock_all_signals(void) +{ + sigset_t set; + if (sigfillset(&set)) { + error("sigfillset: %m"); + return SLURM_ERROR; + } + if (sigprocmask(SIG_UNBLOCK, &set, NULL)) { + error("sigprocmask: %m"); + return SLURM_ERROR; + } + return SLURM_SUCCESS; +} + + diff --git a/src/slurmd/smgr.h b/src/slurmd/smgr.h new file mode 100644 index 0000000000000000000000000000000000000000..f1c1a7a3549c66f74caff0a739377a9e366eadc4 --- /dev/null +++ b/src/slurmd/smgr.h @@ -0,0 +1,64 @@ +/*****************************************************************************\ + * src/slurmd/smgr.h - session manager functions for slurmd + * $Id$ + ***************************************************************************** + * Copyright (C) 2002 The Regents of the University of California. + * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). + * Written by Mark Grondona <mgrondona@llnl.gov>. + * UCRL-CODE-2002-040. + * + * This file is part of SLURM, a resource management program. + * For details, see <http://www.llnl.gov/linux/slurm/>. + * + * SLURM is free software; you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the License, or (at your option) + * any later version. + * + * SLURM is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along + * with SLURM; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. +\*****************************************************************************/ + +#ifndef _SMGR_H +#define _SMGR_H + +#if HAVE_CONFIG_H +# include "config.h" +#endif + +#include <slurm/slurm_errno.h> + +#if HAVE_SYS_TYPES_H +# include <sys/types.h> +#endif /* HAVE_SYS_TYPES_H */ + +#include "src/slurmd/job.h" + +/* + * Task exit code information + */ +typedef struct exit_status { + int taskid; + int status; +} exit_status_t; + + +/* + * Create the session manager process, which starts a new session + * and runs as the UID of the job owner. The session manager process + * will wait for all tasks in the job to exit (sending task exit messages + * as appropriate), and then exit itself. + * + * If the smgr process is successfully created, the pid of the new + * process is returned. On error, (pid_t) -1 is returned. + * + */ +pid_t smgr_create(slurmd_job_t *job); + +#endif /* !_SMGR_H */ diff --git a/src/slurmd/ulimits.c b/src/slurmd/ulimits.c new file mode 100644 index 0000000000000000000000000000000000000000..28566dfd8d5d8f440c211d451c385d4af713ffd5 --- /dev/null +++ b/src/slurmd/ulimits.c @@ -0,0 +1,139 @@ +/*****************************************************************************\ + * src/slurmd/ulimits.c - set user limits for job + * $Id$ + ***************************************************************************** + * Copyright (C) 2002 The Regents of the University of California. + * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). + * Written by Mark Grondona <mgrondona@llnl.gov>. + * UCRL-CODE-2002-040. + * + * This file is part of SLURM, a resource management program. + * For details, see <http://www.llnl.gov/linux/slurm/>. + * + * SLURM is free software; you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the License, or (at your option) + * any later version. + * + * SLURM is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along + * with SLURM; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. +\*****************************************************************************/ + +#if HAVE_CONFIG_H +# include "config.h" +#endif + +#include <sys/resource.h> +#include <unistd.h> +#include <stdlib.h> +#include <string.h> + +#include "src/common/log.h" + +#include "src/slurmd/job.h" + +struct userlim { + char *var; + int resource; +}; + +static struct userlim ulims[] = + { { "SLURM_RLIMIT_CORE" , RLIMIT_CORE }, + { "SLURM_RLIMIT_FSIZE" , RLIMIT_FSIZE }, + { "SLURM_RLIMIT_NPROC" , RLIMIT_NPROC }, + { "SLURM_RLIMIT_NOFILE", RLIMIT_NOFILE}, + { NULL, 0 } }; + +/* + * Prototypes: + * + */ +static char * _getenvp(char **env, const char *name); +static long _get_env_val(char **env, const char *name); +static int _set_limit(char **env, struct userlim *ulim); + + +/* + * Set all user limits off environment variables as detailed in + * the local ulims[] var. Sets limits off environment variables + * in job->env. + */ +int set_user_limits(slurmd_job_t *job) +{ + struct userlim *uptr = &ulims[0]; + + while (uptr && (uptr->var != NULL)) { + _set_limit(job->env, uptr); + uptr++; + } + + return SLURM_SUCCESS; +} + +static int +_set_limit(char **env, struct userlim *u) +{ + long val; + int retval = -1; + struct rlimit r; + + if ((val = _get_env_val(env, u->var)) > -2L) { + getrlimit(u->resource, &r); + + r.rlim_cur = (val == -1L) ? RLIM_INFINITY : (rlim_t) val; + + if ((retval = setrlimit(u->resource, &r)) < 0) + error("setrlimit(%s, %ld): %m", u->var+5, val); + } + + return retval; +} + + +static long +_get_env_val(char **env, const char *name) +{ + char *val = NULL; + char *p = NULL; + long retval = 0L; + + xassert(env != NULL); + xassert(name != NULL); + + if(!(val = _getenvp(env, name))) + return -2L; + + retval = strtol(val, &p, 10); + + if (p && (*p != '\0')) { + error("Invalid %s env var, value = `%s'", name, val); + return -2L; + } + + return retval; +} + +static char * +_getenvp(char **env, const char *name) +{ + size_t len = strlen(name); + char **ep; + + if ((env == NULL) || (env[0] == '\0')) + return NULL; + + for (ep = env; *ep != NULL; ++ep) { + if (!strncmp(*ep, name, len) && ((*ep)[len] == '=')) + return &(*ep)[len+1]; + } + + return NULL; +} + + diff --git a/src/slurmd/ulimits.h b/src/slurmd/ulimits.h new file mode 100644 index 0000000000000000000000000000000000000000..a89f5f8b794a2d01d5315611ffed5513d66d43f4 --- /dev/null +++ b/src/slurmd/ulimits.h @@ -0,0 +1,38 @@ +/*****************************************************************************\ + * src/slurmd/ulimits.h - functions to set user resource limits in slurmd + ***************************************************************************** + * Copyright (C) 2002 The Regents of the University of California. + * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). + * Written by Mark Grondona <mgrondona@llnl.gov>. + * UCRL-CODE-2002-040. + * + * This file is part of SLURM, a resource management program. + * For details, see <http://www.llnl.gov/linux/slurm/>. + * + * SLURM is free software; you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the License, or (at your option) + * any later version. + * + * SLURM is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along + * with SLURM; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. +\*****************************************************************************/ + +#ifndef _SLURMD_ULIMITS_H +#define _SLURMD_ULIMITS_H + +#include "src/slurmd/job.h" + +/* + * Set user resource limits as defined by SLURM_RLIMIT* environment + * variables contained in job->env + */ +int set_user_limits(slurmd_job_t *job); + +#endif /* !_SLURMD_ULIMITS_H */