diff --git a/src/api/slurm.h b/src/api/slurm.h index 992288c1d57b85b7a77609d0bd3b486a536cff72..4f106b92037036a22904af6bf685878f765ed9c6 100644 --- a/src/api/slurm.h +++ b/src/api/slurm.h @@ -350,6 +350,7 @@ typedef struct slurm_ctl_conf { char *slurmd_logfile; /* where slurmd error log gets written */ uint32_t slurmd_port; /* default communications port to slurmd */ char *slurmd_spooldir; /* where slurmd put temporary state info */ + char *slurmd_pidfile; /* where to put slurmd pidfile */ uint16_t slurmd_timeout;/* how long slurmctld waits for slurmd before * considering node DOWN */ char *slurm_conf; /* pathname of slurm config file */ diff --git a/src/common/daemonize.c b/src/common/daemonize.c index 07d8de47e5bf23c59d72bf8ceb9a24c0bfbd9aed..45affcef1123076dfb8a33ed58d9c27c098f1c3c 100644 --- a/src/common/daemonize.c +++ b/src/common/daemonize.c @@ -33,6 +33,7 @@ #include "src/common/macros.h" #include "src/common/log.h" +#include "src/common/xassert.h" /* closeall FDs >= a specified value */ static void @@ -80,3 +81,33 @@ daemon(int nochdir, int noclose) return 0; } + +int +create_pidfile(const char *pidfile) +{ + FILE *fp; + + xassert(pidfile != NULL); + xassert(pidfile[0] == '/'); + + if (!(fp = fopen(pidfile, "w"))) { + error("Unable to open pidfile `%s': %m", pidfile); + return -1; + } + if (fprintf(fp, "%d\n", (int) getpid()) == EOF) { + error("Unable to write to pidfile `%s': %m", pidfile); + goto error; + } + if (fclose(fp) == EOF) { + error("Unable to close pidfile `%s': %m", pidfile); + goto error; + } + + return 0; + + error: + if (unlink(pidfile) < 0) + error("Unable to remove pidfile `%s': %m", pidfile); + return -1; +} + diff --git a/src/common/daemonize.h b/src/common/daemonize.h index be1898aefabea9163fa1da0d951b0f4f4c06f532..698974980574564f68d8d72df627e031388bba4a 100644 --- a/src/common/daemonize.h +++ b/src/common/daemonize.h @@ -37,4 +37,8 @@ */ int daemon(int nochdir, int noclose); +/* Write pid into file pidfile + */ +int create_pidfile(char *pidfilename); + #endif /* !_HAVE_DAEMONIZE_H */ diff --git a/src/common/log.c b/src/common/log.c index 336a6a11f84d7820b3bc6848ba54beffd07ffbe8..7f0b7192a546c9eecc510e66ee976886d6ccf742 100644 --- a/src/common/log.c +++ b/src/common/log.c @@ -264,12 +264,15 @@ static char *vxstrfmt(const char *fmt, va_list ap) xslurm_strerrorcat(buf); break; - case 't': /* "%t" => locally date/time*/ + case 't': /* "%t" => locally preferred date/time */ xstrftimecat(buf, "%x %X"); break; - case 'T': /* "%T" => "dd Mon yyyy hh:mm:ss off" */ + case 'T': /* "%T" => "dd Mon yyyy hh:mm:ss off" */ xstrftimecat(buf, "%a %d %b %Y %H:%M:%S %z"); break; + case 'M': /* "%M" => "Mon DD hh:mm:ss" */ + xstrftimecat(buf, "%b %d %T"); + break; case 's': /* "%s" => append string */ /* we deal with this case for efficiency */ if (unprocessed == 0) @@ -411,7 +414,7 @@ static void log_msg(log_level_t level, const char *fmt, va_list args) } if (level <= log->opt.logfile_level && log->logfp != NULL) { - xstrfmtcat(&msgbuf, "[%T] %s%s", pfx, buf); + xstrfmtcat(&msgbuf, "[%M] %s%s", pfx, buf); if (strlen(buf) > 0 && buf[strlen(buf) - 1] == '\n') fprintf(log->logfp, "%s", msgbuf); diff --git a/src/common/read_config.c b/src/common/read_config.c index 6cb659cc92a11d1d192a0eb0e37642c9f105da56..8a6b85753752a50e3fe7e86c933c4a1775d0a83c 100644 --- a/src/common/read_config.c +++ b/src/common/read_config.c @@ -126,7 +126,7 @@ parse_config_spec (char *in_line, slurm_ctl_conf_t *ctl_conf_ptr) char *state_save_location = NULL, *tmp_fs = NULL; char *slurmctld_logfile = NULL, *slurmctld_port = NULL; char *slurmd_logfile = NULL, *slurmd_port = NULL; - char *slurmd_spooldir = NULL; + char *slurmd_spooldir = NULL, *slurmd_pidfile = NULL; char *job_credential_private_key = NULL; char *job_credential_public_certificate = NULL; long first_job_id = -1; @@ -153,6 +153,7 @@ parse_config_spec (char *in_line, slurm_ctl_conf_t *ctl_conf_ptr) "SlurmdLogFile=", 's', &slurmd_logfile, "SlurmdPort=", 's', &slurmd_port, "SlurmdSpoolDir=", 's', &slurmd_spooldir, + "SlurmdPidFile=", 's', &slurmd_pidfile, "SlurmdTimeout=", 'd', &slurmd_timeout, "StateSaveLocation=", 's', &state_save_location, "TmpFS=", 's', &tmp_fs, @@ -316,6 +317,14 @@ parse_config_spec (char *in_line, slurm_ctl_conf_t *ctl_conf_ptr) ctl_conf_ptr->slurmd_spooldir = slurmd_spooldir; } + if ( slurmd_pidfile ) { + if ( ctl_conf_ptr->slurmd_pidfile ) { + error (MULTIPLE_VALUE_MSG, "SlurmdPidFile"); + xfree (ctl_conf_ptr->slurmd_pidfile); + } + ctl_conf_ptr->slurmd_pidfile = slurmd_pidfile; + } + if ( slurmd_timeout != -1) { if ( ctl_conf_ptr->slurmd_timeout != (uint16_t) NO_VAL) error (MULTIPLE_VALUE_MSG, "SlurmdTimeout"); diff --git a/src/slurmd/io.c b/src/slurmd/io.c index b9c253a554bff1f8f5de1a6141b5a4e4d3b9ba3d..0396d6e3007b664f70876ff52c8d9447bb764a5d 100644 --- a/src/slurmd/io.c +++ b/src/slurmd/io.c @@ -104,9 +104,9 @@ struct io_info { static int _io_init_pipes(task_info_t *t); -static void _io_prepare_clients(slurmd_job_t *); -static void _io_prepare_tasks(slurmd_job_t *); -static void _io_prepare_files(slurmd_job_t *); +static int _io_prepare_clients(slurmd_job_t *); +static int _io_prepare_tasks(slurmd_job_t *); +static int _io_prepare_files(slurmd_job_t *); static void * _io_thr(void *); static int _io_write_header(struct io_info *, srun_info_t *); static void _io_connect_objs(io_obj_t *, io_obj_t *); @@ -121,7 +121,7 @@ static void _io_client_attach(io_obj_t *, io_obj_t *, io_obj_t *, static struct io_obj * _io_obj_create(int fd, void *arg); static struct io_info * _io_info_create(uint32_t id); -static struct io_obj * _io_obj(slurmd_job_t *, int, uint32_t, int); +static struct io_obj * _io_obj(slurmd_job_t *, task_info_t *, int, int); static void * _io_thr(void *arg); @@ -142,6 +142,7 @@ static int _client_read(io_obj_t *, List); static int _task_error(io_obj_t *, List); static int _client_error(io_obj_t *, List); static int _connecting_write(io_obj_t *, List); +static int _file_write(io_obj_t *, List); /* Task Output operations (TASK_STDOUT, TASK_STDERR) * These objects are never writable -- @@ -208,10 +209,9 @@ io_spawn_handler(slurmd_job_t *job) } /* create task IO objects and append these to the objs list - * - * XXX check for errors? */ - _io_prepare_tasks(job); + if (_io_prepare_tasks(job) < 0) + return SLURM_FAILURE; if ((errno = pthread_attr_init(&attr)) != 0) error("pthread_attr_init: %m"); @@ -227,10 +227,13 @@ io_spawn_handler(slurmd_job_t *job) /* open 2*ntask initial connections or files for stdout/err * append these to objs list */ - if (list_count(job->sruns) > 0) - _io_prepare_clients(job); - _io_prepare_files(job); + if ((list_count(job->sruns) > 0) && (_io_prepare_clients(job) < 0)) + return SLURM_FAILURE; + if (_io_prepare_files(job) < 0) + slurm_seterrno_ret(ESCRIPT_OPEN_OUTPUT_FAILED); + + return 0; } @@ -292,7 +295,7 @@ _io_thr(void *arg) return (void *)1; } -static void +static int _io_prepare_tasks(slurmd_job_t *job) { int i; @@ -302,29 +305,31 @@ _io_prepare_tasks(slurmd_job_t *job) for (i = 0; i < job->ntasks; i++) { t = job->task[i]; - t->in = _io_obj(job, t->pin[1], t->gid, TASK_STDIN ); + t->in = _io_obj(job, t, t->pin[1], TASK_STDIN ); list_append(job->objs, (void *)t->in ); - t->out = _io_obj(job, t->pout[0], t->gid, TASK_STDOUT); + t->out = _io_obj(job, t, t->pout[0], TASK_STDOUT); list_append(job->objs, (void *)t->out); /* "ghost" stdout client buffers task data without sending * it anywhere */ - obj = _io_obj(job, -1, t->gid, CLIENT_STDOUT); + obj = _io_obj(job, t, -1, CLIENT_STDOUT); _io_client_attach(obj, t->out, NULL, job->objs); - t->err = _io_obj(job, t->perr[0], t->gid, TASK_STDERR); + t->err = _io_obj(job, t, t->perr[0], TASK_STDERR); list_append(job->objs, (void *)t->err); /* "fake" stderr client buffers task data without sending * it anywhere */ - obj = _io_obj(job, -1, t->gid, CLIENT_STDERR); + obj = _io_obj(job, t, -1, CLIENT_STDERR); _io_client_attach(obj, t->err, NULL, job->objs); } xassert(_validate_io_list(job->objs)); + + return SLURM_SUCCESS; } /* @@ -361,7 +366,7 @@ _io_add_connecting(slurmd_job_t *job, task_info_t *t, srun_info_t *srun, fd_set_nonblocking(sock); fd_set_close_on_exec(sock); - obj = _io_obj(job, sock, t->gid, type); + obj = _io_obj(job, t, sock, type); obj->ops = &connecting_client_ops; _io_write_header(obj->arg, srun); list_append(job->objs, (void *)obj); @@ -370,7 +375,7 @@ _io_add_connecting(slurmd_job_t *job, task_info_t *t, srun_info_t *srun, /* * create initial client objs for N tasks */ -static void +static int _io_prepare_clients(slurmd_job_t *job) { int i; @@ -380,7 +385,7 @@ _io_prepare_clients(slurmd_job_t *job) srun = list_peek(job->sruns); if (srun->noconnect) - return; + return SLURM_SUCCESS; /* * connect back to clients for stdin/out/err @@ -392,6 +397,8 @@ _io_prepare_clients(slurmd_job_t *job) /* kick IO thread */ pthread_kill(job->ioid, SIGHUP); } + + return SLURM_SUCCESS; } static int @@ -422,15 +429,15 @@ _open_output_file(slurmd_job_t *job, task_info_t *t, slurmd_io_type_t type) if ((fd = _open_task_file(fname, flags)) > 0) { verbose("opened `%s' for %s fd %d", fname, _io_str[type], fd); - obj = _io_obj(job, fd, t->gid, type); + obj = _io_obj(job, t, fd, type); _obj_set_unreadable(obj); + obj->ops->handle_write = &_file_write; xassert(obj->ops->writable != NULL); if (type == CLIENT_STDOUT) _io_client_attach(obj, t->out, NULL, job->objs); else _io_client_attach(obj, t->err, NULL, job->objs); - } else - error("Unable to open `%s': %m", fname); + } _validate_io_list(job->objs); @@ -445,28 +452,33 @@ _open_stdin_file(slurmd_job_t *job, task_info_t *t) int flags = O_RDONLY; if ((fd = _open_task_file(t->ifname, flags)) > 0) { - obj = _io_obj(job, fd, t->gid, CLIENT_STDIN); + obj = _io_obj(job, t, fd, CLIENT_STDIN); _obj_set_unwritable(obj); _io_client_attach(obj, NULL, t->in, job->objs); } return fd; } -static void +static int _io_prepare_files(slurmd_job_t *job) { int i; if (!job->ofname && !job->efname && !job->ifname) - return; + return SLURM_SUCCESS; for (i = 0; i < job->ntasks; i++) { - _open_output_file(job, job->task[i], CLIENT_STDOUT); - _open_output_file(job, job->task[i], CLIENT_STDERR); - if (job->ifname) - _open_stdin_file (job, job->task[i]); + if (_open_output_file(job, job->task[i], CLIENT_STDOUT) < 0) + return SLURM_FAILURE; + if (_open_output_file(job, job->task[i], CLIENT_STDERR) < 0) + return SLURM_FAILURE; + if (job->ifname && (_open_stdin_file(job, job->task[i]) < 0)) + return SLURM_FAILURE; + pthread_kill(job->ioid, SIGHUP); } + + return SLURM_SUCCESS; } /* Attach io obj "client" as a reader of 'writer' and a writer to 'reader' @@ -587,7 +599,9 @@ _io_disconnect(struct io_info *src, struct io_info *dst) { char *a, *b; xassert(src->magic == IO_MAGIC); + xassert(src->readers != NULL); xassert(dst->magic == IO_MAGIC); + xassert(dst->writers != NULL); a = _io_str[dst->type]; b = _io_str[src->type]; @@ -618,7 +632,7 @@ _io_disconnect_client(struct io_info *client, List objs) while ((t = list_next(i))) { if (list_count(t->readers) > 1) { destroy = true; - _io_disconnect(client, t); + _io_disconnect(t, client); } } list_iterator_destroy(i); @@ -630,7 +644,7 @@ _io_disconnect_client(struct io_info *client, List objs) i = list_iterator_create(client->readers); while ((t = list_next(i))) { if (list_count(t->writers) > 1) { - _io_disconnect(t, client); + _io_disconnect(client, t); } } list_iterator_destroy(i); @@ -675,9 +689,9 @@ _ops_destroy(struct io_operations *ops) } io_obj_t * -_io_obj(slurmd_job_t *job, int fd, uint32_t id, int type) +_io_obj(slurmd_job_t *job, task_info_t *t, int fd, int type) { - struct io_info *io = _io_info_create(id); + struct io_info *io = _io_info_create(t->gid); struct io_obj *obj = _io_obj_create(fd, (void *)io); xassert(io->magic == IO_MAGIC); @@ -717,7 +731,7 @@ _io_obj(slurmd_job_t *job, int fd, uint32_t id, int type) */ io->obj = obj; io->job = job; - io->task = job->task[io->id - job->task[0]->gid]; + io->task = t; xassert(io->task->gid == io->id); @@ -972,6 +986,16 @@ _write(io_obj_t *obj, List objs) return 0; } +/* flush after writing data to file + */ +static int +_file_write(io_obj_t *obj, List objs) +{ + int rc = _write(obj, objs); + fdatasync(obj->fd); + return rc; +} + static void _do_attach(struct io_info *io) { diff --git a/src/slurmd/job.c b/src/slurmd/job.c index 22713a70b8e75efee3a38c3e4ba62a351f68a4d5..a15d2ace289c95f08a8780a9c024c9745f55c12d 100644 --- a/src/slurmd/job.c +++ b/src/slurmd/job.c @@ -376,7 +376,10 @@ job_update_shm(slurmd_job_t *job) if (shm_insert_step(&s) < 0) error("Updating shmem with new step info: %m"); - verbose("shm_insert job %d.%d", job->jobid, job->stepid); + if (job->stepid == NO_VAL) + debug("updated shm with job %d", job->jobid); + else + debug("updated shm with step %d.%d", job->jobid, job->stepid); } void diff --git a/src/slurmd/mgr.c b/src/slurmd/mgr.c index 48b89907b42bcc98836fb7fef40334d4a80f4fa2..05dca389f5a9e86a4c0a475dd2d8fb292d63f305 100644 --- a/src/slurmd/mgr.c +++ b/src/slurmd/mgr.c @@ -68,7 +68,7 @@ static int _seteuid_and_chdir(slurmd_job_t *job); static int _setuid(slurmd_job_t *job); static int _unblock_all_signals(void); static int _send_exit_msg(int rc, task_info_t *t); -static int _complete_job(slurmd_job_t *job); +static int _complete_job(slurmd_job_t *job, int rc, int status); /* Launch a job step on this node */ @@ -94,7 +94,7 @@ mgr_launch_tasks(launch_tasks_request_msg_t *msg, slurm_addr *cli) shm_fini(); return(SLURM_SUCCESS); error: - job_error(job, "cannot run job"); + job_error(job, "cannot run job: %m"); shm_fini(); return(SLURM_ERROR); } @@ -111,7 +111,6 @@ _make_batch_dir(slurmd_job_t *job) goto error; } - if (chown(path, (uid_t) -1, (gid_t) job->pwd->pw_gid) < 0) { error("chown(%s): %m", path); goto error; @@ -170,63 +169,75 @@ _make_batch_script(batch_job_launch_msg_t *msg, char *path) } -static void +static int _setup_batch_env(slurmd_job_t *job, char *nodes) { char buf[1024]; int envc; hostlist_t hl = hostlist_create(nodes); + if (!hl) + return SLURM_ERROR; envc = (int)job->envc; + hostlist_ranged_string(hl, 1024, buf); setenvpf(&job->env, &envc, "SLURM_JOBID=%u", job->jobid); - if (hl) { - hostlist_ranged_string(hl, 1024, buf); - setenvpf(&job->env, &envc, "SLURM_NNODES=%u", - hostlist_count(hl)); - setenvpf(&job->env, &envc, "SLURM_NODELIST=%s", buf); - hostlist_destroy(hl); - } + setenvpf(&job->env, &envc, "SLURM_NNODES=%u", hostlist_count(hl)); + setenvpf(&job->env, &envc, "SLURM_NODELIST=%s", buf); + hostlist_destroy(hl); job->envc = envc; + + return 0; } int mgr_launch_batch_job(batch_job_launch_msg_t *msg, slurm_addr *cli) { + int rc = 0; + int status = 0; slurmd_job_t *job; char *batchdir; /* New process, so must reinit shm */ - if (shm_init() < 0) - goto cleanup; + if ((rc = shm_init()) < 0) + goto cleanup1; if (!(job = job_batch_job_create(msg))) - goto cleanup; + goto cleanup2; + + job_update_shm(job); if ((batchdir = _make_batch_dir(job)) == NULL) - goto cleanup; + goto cleanup2; if (job->argv[0]) xfree(job->argv[0]); if ((job->argv[0] = _make_batch_script(msg, batchdir)) == NULL) - goto cleanup; + goto cleanup3; - _setup_batch_env(job, msg->nodes); + if ((rc = _setup_batch_env(job, msg->nodes)) < 0) + goto cleanup; - _run_batch_job(job); + status = _run_batch_job(job); cleanup: - _complete_job(job); - shm_fini(); if (job->argv[0] && (unlink(job->argv[0]) < 0)) error("unlink(%s): %m", job->argv[0]); + cleanup3: if (batchdir && (rmdir(batchdir) < 0)) error("rmdir(%s): %m", batchdir); xfree(batchdir); + cleanup2: + shm_delete_step(job->jobid, job->stepid); + shm_fini(); + cleanup1: + verbose("job %d completed with slurm_rc = %d, job_rc = %d", + job->jobid, rc, status); + _complete_job(job, rc, status); return 0; } @@ -255,23 +266,23 @@ _run_job(slurmd_job_t *job) * Need to detach from shared memory * We don't know what will happen in interconnect_init() */ - shm_fini(); + /* shm_fini(); */ if (interconnect_init(job) == SLURM_ERROR) { job_error(job, "interconnect_init: %m"); rc = -2; - shm_init(); + /* shm_init(); */ goto done; } /* Reattach to shared memory after interconnect is initialized */ - job_debug(job, "%ld reattaching to shm", getpid()); - if (shm_init() < 0) { + /* job_debug(job, "%ld reattaching to shm", getpid()); */ + /* if (shm_init() < 0) { job_error(job, "unable to reattach to shm: %m"); rc = -1; goto done; - } + }*/ /* initialize I/O, connect back to srun, and spawn thread for * forwarding I/O. @@ -280,7 +291,7 @@ _run_job(slurmd_job_t *job) /* Temporarily drop permissions and attempt to chdir() * */ - if ((rc = _seteuid_and_chdir(job)) < 0) + if ((rc = _seteuid_and_chdir(job)) < 0) goto done; /* Option: connect slurmd stderr to srun local task 0: stderr? */ @@ -291,18 +302,18 @@ _run_job(slurmd_job_t *job) } if ((seteuid(suid) < 0) || (setegid(sgid) < 0)) - error("seteuid(0): %m"); + error("sete{u/g}id(%ld/%ld): %m", suid, sgid); _exec_all_tasks(job); - job_debug(job, "job complete, waiting on IO"); + job_debug2(job, "job complete, waiting on IO"); io_close_all(job); pthread_join(job->ioid, NULL); - job_debug(job, "IO complete"); + job_debug2(job, "IO complete"); -done: + done: interconnect_fini(job); /* ignore errors */ job_delete_shm(job); /* again, ignore errors */ - job_verbose(job, "completed"); + job_verbose(job, "job complete with rc = %d", rc); if (rc < 0) { for (i = 0; i < job->ntasks; i++) _send_exit_msg(-rc, job->task[i]); @@ -311,7 +322,7 @@ done: } static int -_complete_job(slurmd_job_t *job) +_complete_job(slurmd_job_t *job, int err, int status) { int rc; size_t size; @@ -323,8 +334,8 @@ _complete_job(slurmd_job_t *job) req.job_id = job->jobid; req.job_step_id = NO_VAL; - req.job_rc = 0; - req.slurm_rc = SLURM_SUCCESS; + req.job_rc = status; + req.slurm_rc = err; req.node_name = conf->hostname; msg.msg_type = REQUEST_COMPLETE_JOB_STEP; msg.data = &req; @@ -369,14 +380,12 @@ _complete_job(slurmd_job_t *job) static int _run_batch_job(slurmd_job_t *job) { - int rc; + int status = 0; + int rc = 0; task_t t; pid_t sid, pid; - int status; - gid_t sgid = getgid(); - uid_t suid = getuid(); - - job_update_shm(job); + gid_t sgid = getgid(); + uid_t suid = getuid(); /* Temporarily drop permissions to initiate * IO thread. This will ensure that calling user @@ -385,26 +394,31 @@ _run_batch_job(slurmd_job_t *job) */ _seteuid_and_chdir(job); - if (io_spawn_handler(job) == SLURM_ERROR) { - job_error(job, "unable to spawn io handler"); - rc = SLURM_ERROR; - goto done; - } + rc = io_spawn_handler(job); /* seteuid/gid back to saved uid/gid */ if ((seteuid(suid) < 0) || (setegid(sgid) < 0)) { - fatal("set{e/g}uid(%ld/%ld) : %m", suid, sgid); + error("set{e/g}uid(%ld/%ld) : %m", suid, sgid); + return ESLURMD_SET_UID_OR_GID_ERROR; } + /* Give up if we couldn't spawn IO handler for whatever reason + */ + if (rc < 0) + return ESLURMD_CANNOT_SPAWN_IO_THREAD; + xsignal(SIGPIPE, SIG_IGN); if ((sid = setsid()) < (pid_t) 0) { error("job %d: setsid: %m", job->jobid); + return ESLURMD_SET_SID_ERROR; } - if (shm_update_step_sid(job->jobid, job->stepid, sid) < 0) + if (shm_update_step_sid(job->jobid, job->stepid, sid) < 0) { error("job %d: shm_update_step_sid: %m", job->jobid); + return ESLURMD_SHARED_MEMORY_ERROR; + } t.id = 0; t.global_id = 0; @@ -412,8 +426,7 @@ _run_batch_job(slurmd_job_t *job) if ((t.pid = fork()) < 0) { error("fork: %m"); - exit(1); - /* job_cleanup() */ + return ESLURMD_FORK_FAILED; } else if (t.pid == 0) /* child */ _task_exec(job, 0, true); @@ -421,13 +434,13 @@ _run_batch_job(slurmd_job_t *job) job->task[0]->pid = t.pid; - if (shm_add_task(job->jobid, job->stepid, &t) < 0) + if (shm_add_task(job->jobid, job->stepid, &t) < 0) { job_error(job, "shm_add_task: %m"); + return ESLURMD_SHARED_MEMORY_ERROR; + } while ((pid = waitpid(0, &status, 0)) < 0 && (pid != t.pid)) { - if (pid > 0) - continue; - if (errno == EINTR) + if ((pid > 0) || (errno == EINTR)) continue; else error("waitpid: %m"); @@ -439,12 +452,7 @@ _run_batch_job(slurmd_job_t *job) io_close_all(job); pthread_join(job->ioid, NULL); - _complete_job(job); - -done: - shm_delete_step(job->jobid, job->stepid); - return rc; - + return status; } static void diff --git a/src/slurmd/req.c b/src/slurmd/req.c index e968e2241fccd64c41c225b5afbcfa7ace831fda..82eddcbf0fa54741398884de82a5d689fce6087b 100644 --- a/src/slurmd/req.c +++ b/src/slurmd/req.c @@ -124,7 +124,7 @@ _launch_batch_job(batch_job_launch_msg_t *req, slurm_addr *cli) /* NOTREACHED */ break; default: - verbose("created process %ld for job %d", + debug("created process %ld for job %d", pid, req->job_id); break; } @@ -156,7 +156,7 @@ _launch_tasks(launch_tasks_request_msg_t *req, slurm_addr *cli) /* NOTREACHED */ break; default: - verbose("created process %ld for job %d.%d", + debug("created process %ld for job %d.%d", pid, req->job_id, req->job_step_id); break; } @@ -181,7 +181,7 @@ _rpc_launch_tasks(slurm_msg_t *msg, slurm_addr *cli) req_uid = slurm_auth_uid(msg->cred); req_gid = slurm_auth_gid(msg->cred); - verbose("launch tasks request from %ld@%s", req_uid, host, port); + info("launch tasks request from %ld@%s", req_uid, host); rc = verify_credential(&conf->vctx, req->credential, @@ -190,7 +190,7 @@ _rpc_launch_tasks(slurm_msg_t *msg, slurm_addr *cli) if ((rc == SLURM_SUCCESS) && (req_uid == req->uid)) rc = _launch_tasks(req, cli); else { - verbose("Invalid credential from %ld@%s, launching job anyway", + info("XXX: Invalid credential from %ld@%s, launching job anyway", req_uid, host); rc = _launch_tasks(req, cli); } @@ -222,14 +222,12 @@ _rpc_batch_job(slurm_msg_t *msg, slurm_addr *cli) req_uid = slurm_auth_uid(msg->cred); req_gid = slurm_auth_gid(msg->cred); - verbose("req_uid = %ld, req->uid = %ld", req_uid, req->uid); - if ((req_uid != 0) && (req_uid != (uid_t)req->uid)) { rc = EPERM; goto done; } - verbose("batch launch request from %ld@%s", req_uid, host, port); + info("batch launch request from %ld@%s", req_uid, host); if (_launch_batch_job(req, cli) < 0) rc = SLURM_FAILURE; diff --git a/src/slurmd/shm.c b/src/slurmd/shm.c index 9b16922a40d1959ebc130a12366c48e5e7691f5c..6d8670ec20877e9b1f4a51a8deb4af45fd345661 100644 --- a/src/slurmd/shm.c +++ b/src/slurmd/shm.c @@ -58,16 +58,17 @@ #include <stdlib.h> #include <signal.h> -#include <src/common/list.h> -#include <src/common/log.h> -#include <src/common/xmalloc.h> -#include <src/common/xassert.h> -#include <src/common/slurm_errno.h> +#include "src/common/list.h" +#include "src/common/log.h" +#include "src/common/xmalloc.h" +#include "src/common/xassert.h" +#include "src/common/slurm_errno.h" -#include <src/slurmd/shm.h> +#include "src/slurmd/slurmd.h" +#include "src/slurmd/shm.h" /* We use Chris Dunlap's POSIX semaphore implementation if necessary */ -#include <src/slurmd/semaphore.h> +#include "src/slurmd/semaphore.h" #define MAX_JOB_STEPS 16 #define MAX_BATCH_JOBS 128 @@ -118,7 +119,7 @@ static pid_t attach_pid = (pid_t) 0; */ static int _is_valid_ipc_name(const char *name); static char *_create_ipc_name(const char *name); -static int _shm_unlink_lock(void); +static int _shm_unlink_lock(void); static int _shm_lock_and_initialize(void); static void _shm_lock(void); static void _shm_unlock(void); @@ -155,9 +156,12 @@ shm_fini(void) debug3("%ld calling shm_fini() (attached by %ld)", getpid(), attach_pid); - xassert(attach_pid == getpid()); + /* xassert(attach_pid == getpid()); */ - if ((attach_pid == getpid()) && (--slurmd_shm->users == 0)) + /* if ((attach_pid == getpid()) && (--slurmd_shm->users == 0)) + * destroy = 1; + */ + if (--slurmd_shm->users == 0) destroy = 1; /* detach segment from local memory */ @@ -272,7 +276,7 @@ _create_ipc_name(const char *name) #if defined(POSIX_IPC_PREFIX) && defined(HAVE_POSIX_SEMS) dir = POSIX_IPC_PREFIX; #else - if (!(dir = getenv("TMPDIR")) || !strlen(dir)) + if (!(dir = conf->spooldir) || !(dir = getenv("TMPDIR")) || !strlen(dir)) dir = "/tmp"; #endif /* POSIX_IPC_PREFIX */ @@ -290,7 +294,7 @@ _create_ipc_name(const char *name) static int _shm_unlink_lock() { - verbose("process %ld removing shm lock", getpid()); + debug("process %ld removing shm lock", getpid()); if (sem_unlink(lockname) == -1) return 0; xfree(lockname); @@ -643,7 +647,7 @@ shm_add_task(uint32_t jobid, uint32_t stepid, task_t *task) } s = &slurmd_shm->step[i]; - debug2("adding task %d to step %d.%d", task->id, jobid, stepid); + debug3("adding task %d to step %d.%d", task->id, jobid, stepid); if (_shm_find_task_in_step(s, task->id)) { _shm_unlock(); @@ -815,7 +819,7 @@ _shm_reopen() if ((shm_lock = _sem_open(SHM_LOCKNAME, 0)) == SEM_FAILED) { if (errno == ENOENT) { - info("Lockfile found but semaphore deleted:" + debug("Lockfile found but semaphore deleted:" " creating new shm segment"); shm_cleanup(); return _shm_lock_and_initialize(); @@ -855,7 +859,9 @@ _shm_reopen() static int _shm_lock_and_initialize() { - if (slurmd_shm && slurmd_shm->version == SHM_VERSION) { + if (slurmd_shm + && (slurmd_shm->version == SHM_VERSION) + && (shm_lock != SEM_FAILED)) { /* we've already opened shared memory */ _shm_lock(); if (attach_pid != getpid()) { diff --git a/src/slurmd/slurmd.c b/src/slurmd/slurmd.c index 48df303dd57dfe4d19b18ce5c4214e220fa49e9a..e623915b339d609a24caddc65a6620ba301cc266 100644 --- a/src/slurmd/slurmd.c +++ b/src/slurmd/slurmd.c @@ -26,7 +26,7 @@ \*****************************************************************************/ #if HAVE_CONFIG_H -# include <config.h> +# include "config.h" #endif #include <string.h> @@ -38,32 +38,33 @@ #include <sys/resource.h> #include <unistd.h> -#include <src/common/log.h> -#include <src/common/read_config.h> -#include <src/common/xmalloc.h> -#include <src/common/xstring.h> -#include <src/common/slurm_protocol_api.h> -#include <src/common/xsignal.h> -#include <src/common/credential_utils.h> -#include <src/common/signature_utils.h> -#include <src/common/parse_spec.h> -#include <src/common/hostlist.h> -#include <src/common/fd.h> - -#include <src/slurmd/slurmd.h> -#include <src/slurmd/req.h> -#include <src/slurmd/shm.h> -#include <src/slurmd/get_mach_stat.h> +#include "src/common/log.h" +#include "src/common/read_config.h" +#include "src/common/xmalloc.h" +#include "src/common/xstring.h" +#include "src/common/xsignal.h" +#include "src/common/daemonize.h" +#include "src/common/credential_utils.h" +#include "src/common/signature_utils.h" +#include "src/common/slurm_protocol_api.h" +#include "src/common/parse_spec.h" +#include "src/common/hostlist.h" +#include "src/common/macros.h" +#include "src/common/fd.h" + +#include "src/slurmd/slurmd.h" +#include "src/slurmd/req.h" +#include "src/slurmd/shm.h" +#include "src/slurmd/get_mach_stat.h" #define GETOPT_ARGS "L:f:Dvhc" #ifndef MAXHOSTNAMELEN -#define MAXHOSTNAMELEN 64 +# define MAXHOSTNAMELEN 64 #endif -#ifndef MAX -# define MAX(x,y) (((x) >= (y)) ? (x) : (y)) -#endif /* !MAX */ +#define DEFAULT_SPOOLDIR "/tmp" +#define DEFAULT_PIDFILE "/var/run/slurmd.pid" typedef struct connection { slurm_fd fd; @@ -88,12 +89,12 @@ static int _slurmd_init(); static int _slurmd_fini(); static void _create_conf(); static void _init_conf(); +static void _print_conf(); static void _read_config(); +static void _set_slurmd_spooldir(void); static void _usage(); static void _handle_connection(slurm_fd fd, slurm_addr *client); static void *_service_connection(void *); -static void _setdir(void); -static int _mkdir2 (char * path, int modes); static void _fill_registration_msg(slurm_node_registration_status_msg_t *); static slurm_ctl_conf_t slurmctld_conf; @@ -104,15 +105,17 @@ main (int argc, char *argv[]) _create_conf(); _init_conf(); _process_cmdline(argc, argv); - log_init(argv[0], conf->log_opts, LOG_DAEMON, conf->logfile); _read_config(); - _create_msg_socket(); + _print_conf(); + _set_slurmd_spooldir(); - if (conf->daemonize) { + if (conf->daemonize) daemon(0,0); - _setdir(); - } + create_pidfile(DEFAULT_PIDFILE); + log_init(argv[0], conf->log_opts, LOG_DAEMON, conf->logfile); + info("%s started on %T", xbasename(argv[0])); + _create_msg_socket(); conf->pid = getpid(); if (_slurmd_init() < 0) @@ -328,40 +331,46 @@ _free_and_set(char **confvar, char *newval) static void _read_config() { - read_slurm_conf_ctl (&slurmctld_conf); + read_slurm_conf_ctl(&slurmctld_conf); - /* If a parameter was set on the execute line, don't reset it from the config file */ + /* If a parameter was set on the execute line, + * don't reset it from the config file + */ if (conf->conffile == NULL) - _free_and_set(&conf->conffile, slurmctld_conf.slurm_conf ); - if ((conf->logfile == NULL) && (slurmctld_conf.slurmd_logfile)) { - conf->log_opts.logfile_level = MAX (conf->log_opts.logfile_level, - conf->log_opts.stderr_level); - conf->log_opts.logfile_level = MAX (conf->log_opts.logfile_level, - conf->log_opts.syslog_level); - if (conf->daemonize) { - info ("Routing all log messages to %s", slurmctld_conf.slurmd_logfile); - conf->log_opts.stderr_level = LOG_LEVEL_QUIET; + _free_and_set(&conf->conffile, slurmctld_conf.slurm_conf); + + if ((conf->logfile == NULL) && (slurmctld_conf.slurmd_logfile)) + _free_and_set(&conf->logfile, slurmctld_conf.slurmd_logfile ); + + if (conf->daemonize) { + conf->log_opts.stderr_level = LOG_LEVEL_QUIET; + if (conf->logfile) conf->log_opts.syslog_level = LOG_LEVEL_QUIET; - } - _free_and_set(&conf->logfile, slurmctld_conf.slurmd_logfile ); - log_init(conf->prog, conf->log_opts, LOG_DAEMON, conf->logfile); } conf->port = slurmctld_conf.slurmd_port; _free_and_set(&conf->epilog, slurmctld_conf.epilog ); _free_and_set(&conf->prolog, slurmctld_conf.prolog ); _free_and_set(&conf->tmpfs, slurmctld_conf.tmp_fs ); - _free_and_set(&conf->pubkey, slurmctld_conf.job_credential_public_certificate ); + _free_and_set(&conf->pubkey, + slurmctld_conf.job_credential_public_certificate); _free_and_set(&conf->spooldir, slurmctld_conf.slurmd_spooldir); + _free_and_set(&conf->pidfile, slurmctld_conf.slurmd_pidfile); +} - debug3("Confile = `%s'", conf->conffile ); - debug3("Epilog = `%s'", conf->epilog ); - debug3("Logfile = `%s'", conf->logfile ); +static void +_print_conf() +{ + debug3("Confile = `%s'", conf->conffile); + debug3("Epilog = `%s'", conf->epilog); + debug3("Logfile = `%s'", conf->logfile); debug3("Port = %u", conf->port); - debug3("Prolog = `%s'", conf->prolog ); - debug3("TmpFS = `%s'", conf->tmpfs ); - debug3("Public Cert = `%s'", conf->pubkey ); - debug3("Spool Dir = `%s'", conf->spooldir ); + debug3("Prolog = `%s'", conf->prolog); + debug3("TmpFS = `%s'", conf->tmpfs); + debug3("Public Cert = `%s'", conf->pubkey); + debug3("Spool Dir = `%s'", conf->spooldir); + debug3("Pid File = `%s'", conf->pidfile); + } static void @@ -374,7 +383,7 @@ static void _init_conf() { char host[MAXHOSTNAMELEN]; - log_options_t lopts = LOG_OPTS_STDERR_ONLY; + log_options_t lopts = LOG_OPTS_INITIALIZER; if (getnodename(host, MAXHOSTNAMELEN) < 0) { error("Unable to get my hostname: %m"); @@ -384,13 +393,14 @@ _init_conf() conf->conffile = NULL; conf->epilog = NULL; conf->logfile = NULL; - conf->port = 0; - conf->spooldir = NULL; conf->pubkey = NULL; conf->prolog = NULL; - conf->daemonize = 0; + conf->port = 0; + conf->daemonize = 1; conf->lfd = -1; conf->log_opts = lopts; + conf->pidfile = xstrdup(DEFAULT_PIDFILE); + conf->spooldir = xstrdup(DEFAULT_SPOOLDIR); return; } @@ -404,10 +414,12 @@ _process_cmdline(int ac, char **av) while ((c = getopt(ac, av, GETOPT_ARGS)) > 0) { switch (c) { case 'D': - conf->daemonize = 1; + conf->daemonize = 0; break; case 'v': conf->log_opts.stderr_level++; + conf->log_opts.logfile_level++; + conf->log_opts.syslog_level++; break; case 'h': _usage(); @@ -512,46 +524,15 @@ _usage() "\tPrint this help message.\n"); } -/* create spool directory as needed and "cd" to it */ -static void -_setdir(void) -{ - struct stat sbuf; - - if (conf->spooldir) { - if (stat (conf->spooldir, &sbuf) == -1) { - if (_mkdir2(conf->spooldir, 0700)) - error ("mkdir2 on %s error %m", conf->spooldir); - _free_and_set(&conf->spooldir, xstrdup("/tmp") ); - } - } else { - _free_and_set(&conf->spooldir, xstrdup("/tmp") ); - } - chdir(conf->spooldir); -} - -/* _mkdir2 - create a directory, does system call if root, runs mkdir otherwise */ -static int -_mkdir2 (char * path, int modes) +/* create spool directory as needed and "cd" to it + */ +static void +_set_slurmd_spooldir(void) { - char *cmd; - int error_code; - - if (getuid() == 0) { - if (mknod (path, S_IFDIR | modes, 0)) - return errno; - } + if ((mkdir(conf->spooldir, 0755) < 0) && (errno != EEXIST)) + error("mkdir(%s): %m", conf->spooldir); - else { - cmd = xstrdup ("/bin/mkdir "); - xstrcat (cmd, path); - error_code = system (cmd); - xfree (cmd); - if (error_code) - return error_code; - (void) chmod (path, modes); - } - - return SLURM_SUCCESS; + if (chdir(conf->spooldir) < 0) + fatal("chdir(%s): %m", conf->spooldir); } diff --git a/src/slurmd/slurmd.h b/src/slurmd/slurmd.h index b110d5848357f7333f44d679dd3aab3789b824c8..923fa1755118a3b052da7508e90666449bccdf30 100644 --- a/src/slurmd/slurmd.h +++ b/src/slurmd/slurmd.h @@ -28,7 +28,7 @@ #define _SLURMD_H #if HAVE_CONFIG_H -# include <config.h> +# include "config.h" # if HAVE_INTTYPES_H # include <inttypes.h> # else @@ -40,10 +40,10 @@ # include <inttypes.h> #endif /* HAVE_CONFIG_H */ -#include <src/common/log.h> -#include <src/common/list.h> -#include <src/common/slurm_protocol_api.h> -#include <src/common/signature_utils.h> +#include "src/common/log.h" +#include "src/common/list.h" +#include "src/common/slurm_protocol_api.h" +#include "src/common/signature_utils.h" /* * Global config type @@ -55,6 +55,7 @@ typedef struct slurmd_config { char *conffile; /* config filename */ char *logfile; /* slurmd logfile, if any */ char *spooldir; /* SlurmdSpoolDir */ + char *pidfile; /* PidFile location */ char *nodename; /* this node's hostname */ char *tmpfs; /* directory of tmp FS */ char *pubkey; /* location of job cred public key */ diff --git a/src/srun/launch.c b/src/srun/launch.c index ca3b22fc487a451f92a3380c1d18f3a22202e887..af14026cbdc0c0d343e08dde9388e39bc09d8094 100644 --- a/src/srun/launch.c +++ b/src/srun/launch.c @@ -99,7 +99,7 @@ launch(void *arg) slurm_msg_t *req_array_ptr; launch_tasks_request_msg_t *msg_array_ptr; job_t *job = (job_t *) arg; - int i, my_envc, taskid; + int i, my_envc; char hostname[MAXHOSTNAMELEN]; uint32_t **task_ids; @@ -115,10 +115,9 @@ launch(void *arg) task_ids = (uint32_t **) xmalloc(job->nhosts * sizeof(uint32_t *)); for (i = 0; i < job->nhosts; i++) task_ids[i] = (uint32_t *) xmalloc(job->cpus[i]*sizeof(uint32_t)); - taskid = 0; if (opt.distribution == SRUN_DIST_BLOCK) _dist_block(job, task_ids); - else /* (opt.distribution == SRUN_DIST_CYCLIC) */ + else _dist_cyclic(job, task_ids); msg_array_ptr = (launch_tasks_request_msg_t *) @@ -143,6 +142,13 @@ launch(void *arg) r->nnodes = job->nhosts; r->nprocs = opt.nprocs; + if (opt.output == IO_PER_TASK) + r->ofname = opt.ofname; + if (opt.error == IO_PER_TASK) + r->efname = opt.efname; + if (opt.input == IO_PER_TASK) + r->ifname = opt.ifname; + /* Node specific message contents */ r->tasks_to_launch = job->ntask[i]; r->global_task_ids = task_ids[i]; @@ -191,6 +197,7 @@ static void p_launch(slurm_msg_t *req_array_ptr, job_t *job) debug("Node %s is unused",job->host[i]); continue; } + pthread_mutex_lock(&active_mutex); while (active >= opt.max_threads) { pthread_cond_wait(&active_cond, &active_mutex); @@ -210,7 +217,7 @@ static void p_launch(slurm_msg_t *req_array_ptr, job_t *job) if (pthread_attr_setscope (&thread_ptr[i].attr, PTHREAD_SCOPE_SYSTEM)) error ("pthread_attr_setscope error %m"); #endif - while ( pthread_create (&thread_ptr[i].thread, + if ( pthread_create (&thread_ptr[i].thread, &thread_ptr[i].attr, p_launch_task, (void *) task_info_ptr) ) { @@ -218,6 +225,7 @@ static void p_launch(slurm_msg_t *req_array_ptr, job_t *job) /* just run it under this thread */ p_launch_task((void *) task_info_ptr); } + } pthread_mutex_lock(&active_mutex); diff --git a/src/srun/srun.c b/src/srun/srun.c index 4dc7d8f24b55b476fb197d66ac33efb97938c8a3..5c3985c499b36b4e6ed81ce892db85ed81a37114 100644 --- a/src/srun/srun.c +++ b/src/srun/srun.c @@ -107,10 +107,11 @@ static char *get_shell (void); static int is_file_text (char *fname, char** shell_ptr); static int run_batch_job (void); static allocation_resp *existing_allocation(void); -static void run_job_script(uint32_t job_id); +static void run_job_script(uint32_t jobid); static void fwd_signal(job_t *job, int signo); static void p_fwd_signal(slurm_msg_t *req_array_ptr, job_t *job); static void *p_signal_task(void *args); +static int _set_batch_script_env(uint32_t jobid); #ifdef HAVE_LIBELAN3 # include <src/common/qsw.h> @@ -172,12 +173,14 @@ main(int ac, char **av) } else if (opt.allocate) { if ( !(resp = allocate_nodes()) ) exit(1); + job = job_create(resp); if (_verbose || _debug) print_job_information(resp); else printf("jobid %u\n", resp->job_id); run_job_script(resp->job_id); slurm_complete_job(resp->job_id, 0, 0); + if (_verbose || _debug) info ("Spawned srun shell terminated"); exit (0); @@ -643,7 +646,7 @@ static void * p_signal_task(void *args) static int run_batch_job(void) { - int i, file_type, rc, retries; + int file_type, rc, retries; job_desc_msg_t job; submit_response_msg_t *resp; extern char **environ; @@ -694,13 +697,13 @@ run_batch_job(void) if (opt.share) job.shared = 1; + _set_batch_script_env(0); job.environment = environ; - for (i=0; ; i++) { - if (environ[i] == NULL) { - job.env_size = (i - 1); - break; - } - } + + job.env_size = 0; + while (environ[job.env_size] != NULL) + job.env_size++; + job.script = job_script; if (opt.efname) job.err = opt.efname; @@ -885,34 +888,48 @@ existing_allocation( void ) return resp; } -/* allocation option specified, spawn a script and wait for it to exit */ -void run_job_script (uint32_t job_id) +static int +_set_batch_script_env(uint32_t jobid) { - char *shell = NULL; - int i; - pid_t child; + char *dist = NULL; - if (setenvf("SLURM_JOBID=%u", job_id)) { - error("Unable to set SLURM_JOBID environment variable"); - return; + if (jobid > 0) { + if (setenvf("SLURM_JOBID=%u", jobid)) { + error("Unable to set SLURM_JOBID env var"); + return -1; + } } if (setenvf("SLURM_NNODES=%u", opt.nodes)) { error("Unable to set SLURM_NNODES environment variable"); - return; + return -1; } if (setenvf("SLURM_NPROCS=%u", opt.nprocs)) { error("Unable to set SLURM_NPROCS environment variable"); - return; + return -1; } - if (setenvf("SLURM_DISTRIBUTION=%s", - opt.distribution == SRUN_DIST_BLOCK ? "block" : "cyclic")) { + dist = opt.distribution == SRUN_DIST_BLOCK ? "block" : "cyclic"; + + if (setenvf("SLURM_DISTRIBUTION=%s", dist)) { error("Unable to set SLURM_DISTRIBUTION environment variable"); - return; + return -1; } + return 0; +} + +/* allocation option specified, spawn a script and wait for it to exit */ +void run_job_script (uint32_t jobid) +{ + char *shell = NULL; + int i; + pid_t child; + + if (_set_batch_script_env(jobid) < 0) + return; + /* determine shell from script (if any) or user default */ if (remote_argc) { char ** new_argv;