From 1671bdef584ac5206cb6c12d8d27fc11f09e8e03 Mon Sep 17 00:00:00 2001 From: Mark Grondona <mgrondona@llnl.gov> Date: Thu, 27 Feb 2003 19:36:08 +0000 Subject: [PATCH] o src/slurmd/mgr.c : fix a couple bugs in _wait_for_task_exit() o src/srun : move report_{job/task}_status into job.c report task exit on one line per msg (instead of per task) add new task state for abnormal (nonzero) exit. --- src/slurmd/mgr.c | 58 +++---- src/srun/io.c | 77 +--------- src/srun/io.h | 28 +++- src/srun/job.c | 390 ++++++++++++++++++++++++++++++----------------- src/srun/job.h | 13 +- src/srun/msg.c | 54 ++----- 6 files changed, 339 insertions(+), 281 deletions(-) diff --git a/src/slurmd/mgr.c b/src/slurmd/mgr.c index 2427bc46f92..65409fe363c 100644 --- a/src/slurmd/mgr.c +++ b/src/slurmd/mgr.c @@ -551,47 +551,44 @@ _handle_task_exit(slurmd_job_t *job) } +/* + * Loop once through tasks looking for all tasks that have exited with + * the same exit status (and whose statuses have not been sent back to + * the client) Aggregrate these tasks into a single task exit message. + * + */ static int _send_pending_exit_msgs(slurmd_job_t *job) { int i; - int n = 0; - int nsent = 0; - int status = 0; - bool gotStatus = false; + int nsent = 0; + int status = 0; + bool set = false; int tid[job->ntasks]; - + /* * Collect all exit codes with the same status into a - * single message. Count no. of exit codes sent as a - * side effect. - * + * single message. */ for (i = 0; i < job->ntasks; i++) { task_info_t *t = job->task[i]; - if (!t->exited) + if (!t->exited || t->esent) continue; - if (t->esent) { - nsent++; - continue; - } - - if (!gotStatus) { + if (!set) { status = t->estatus; - gotStatus = true; + set = true; } else if (status != t->estatus) continue; - tid[n++] = t->gid; + tid[nsent++] = t->gid; t->esent = true; - nsent++; } - if (n) { - debug2("Aggregated %d task exit messages", n); - _send_exit_msg(job, tid, n, status); + if (nsent) { + debug2("Aggregated %d task exit messages", nsent); + _send_exit_msg(job, tid, nsent, status); } return nsent; @@ -607,8 +604,8 @@ static int _wait_for_task_exit(slurmd_job_t *job) { int rc = 0; - int nsent = 0; int timeout = -1; + int waiting = job->ntasks; int rfd = job->fdpair[0]; struct pollfd pfd[1]; @@ -618,7 +615,8 @@ _wait_for_task_exit(slurmd_job_t *job) fd_set_nonblocking(rfd); do { - int revents; + int revents = 0; + int nsent = 0; if ((rc = poll(pfd, 1, timeout)) < 0) { if (errno == EINTR) { @@ -634,8 +632,8 @@ _wait_for_task_exit(slurmd_job_t *job) if ((revents & (POLLIN|POLLHUP|POLLERR))) { if ((rc = _handle_task_exit(job)) <= 0) { - if (rc < 0) - error ("Unable to read task exit codes"); + if (rc < 0) + error("Unable to read task exit codes"); goto done; } @@ -645,16 +643,20 @@ _wait_for_task_exit(slurmd_job_t *job) } } - nsent = _send_pending_exit_msgs(job); + /* + * send all pending task exit messages + */ + while ((nsent = _send_pending_exit_msgs(job))) + waiting -= rc; timeout = -1; - } while (nsent < job->ntasks); + } while (waiting); return SLURM_SUCCESS; done: - _send_pending_exit_msgs(job); + while (_send_pending_exit_msgs(job)) {;} return SLURM_FAILURE; } diff --git a/src/srun/io.c b/src/srun/io.c index 243f6f653a7..3dec1344507 100644 --- a/src/srun/io.c +++ b/src/srun/io.c @@ -81,10 +81,8 @@ static int _do_task_input(job_t *job, int taskid); static int _do_task_input_poll(job_t *job, fd_info_t *info); static inline bool _io_thr_done(job_t *job); static int _handle_pollerr(fd_info_t *info); -static char * _host_state_name(host_state_t state_inx); static ssize_t _readn(int fd, void *buf, size_t nbytes); static ssize_t _readx(int fd, char *buf, size_t maxbytes); -static char * _task_state_name(task_state_t state_inx); static int _validate_header(slurm_io_stream_header_t *hdr, job_t *job); #define _poll_set_rd(_pfd, _fd) do { \ @@ -433,31 +431,6 @@ static void _do_poll_timeout (job_t *job) } } -void report_job_status(job_t *job) -{ - int i; - - for (i = 0; i < job->nhosts; i++) { - info ("host:%s state:%s", job->host[i], - _host_state_name(job->host_state[i])); - } -} - -static char *_host_state_name(host_state_t state_inx) -{ - switch (state_inx) { - case SRUN_HOST_INIT: - return "initial"; - case SRUN_HOST_CONTACTED: - return "contacted"; - case SRUN_HOST_UNREACHABLE: - return "unreachable"; - case SRUN_HOST_REPLIED: - return "replied"; - default: - return "unknown"; - } -} static inline bool _io_thr_done(job_t *job) @@ -469,53 +442,6 @@ _io_thr_done(job_t *job) return retval; } -void report_task_status(job_t *job) -{ - int i; - char buf[1024]; - hostlist_t hl[5]; - - for (i = 0; i < 5; i++) - hl[i] = hostlist_create(NULL); - - for (i = 0; i < opt.nprocs; i++) { - int state = job->task_state[i]; - if ((state == SRUN_TASK_EXITED) - && ((job->err[i] >= 0) || (job->out[i] >= 0))) - state = 4; - snprintf(buf, 256, "task%d", i); - hostlist_push(hl[state], buf); - } - - for (i = 0; i< 5; i++) { - if (hostlist_count(hl[i]) > 0) { - hostlist_ranged_string(hl[i], 1024, buf); - info("%s: %s", buf, _task_state_name(i)); - } - - hostlist_destroy(hl[i]); - } - -} - -static char *_task_state_name(task_state_t state_inx) -{ - switch (state_inx) { - case SRUN_TASK_INIT: - return "initializing"; - case SRUN_TASK_RUNNING: - return "running"; - case SRUN_TASK_FAILED: - return "failed"; - case SRUN_TASK_EXITED: - return "exited"; - case SRUN_TASK_IO_WAIT: - return "waiting for io"; - default: - return "unknown"; - } -} - static int _stdin_open(char *filename) { @@ -913,7 +839,8 @@ _bcast_stdin(int fd, job_t *job) return; } -static int _validate_header(slurm_io_stream_header_t *hdr, job_t *job) +static +int _validate_header(slurm_io_stream_header_t *hdr, job_t *job) { if (hdr->version != SLURM_PROTOCOL_VERSION) { error("Invalid header version, notify administrators"); diff --git a/src/srun/io.h b/src/srun/io.h index db32a656375..372856251ef 100644 --- a/src/srun/io.h +++ b/src/srun/io.h @@ -1,3 +1,29 @@ +/*****************************************************************************\ + * src/srun/io.h - srun I/O routines + * $Id$ + ***************************************************************************** + * Copyright (C) 2002 The Regents of the University of California. + * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). + * Written by Mark Grondona <mgrondona@llnl.gov>. + * UCRL-CODE-2002-040. + * + * This file is part of SLURM, a resource management program. + * For details, see <http://www.llnl.gov/linux/slurm/>. + * + * SLURM is free software; you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the License, or (at your option) + * any later version. + * + * SLURM is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along + * with SLURM; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. +\*****************************************************************************/ #ifndef _HAVE_IO_H #define _HAVE_IO_H @@ -8,8 +34,6 @@ void *io_thr(void *arg); int io_thr_create(job_t *job); -void report_job_status(job_t *job); -void report_task_status(job_t *job); int open_streams(job_t *job); #endif /* !_HAVE_IO_H */ diff --git a/src/srun/job.c b/src/srun/job.c index 11a5575911e..86b7a66b2d8 100644 --- a/src/srun/job.c +++ b/src/srun/job.c @@ -49,6 +49,11 @@ #include "src/srun/attach.h" #endif + +/* + * allocation information structure used to store general information + * about node allocation to be passed to _job_create_internal() + */ typedef struct allocation_info { uint32_t jobid; uint32_t stepid; @@ -61,6 +66,224 @@ typedef struct allocation_info { } allocation_info_t; + +/* + * Prototypes: + */ +static inline int _estimate_nports(int nclients, int cli_per_port); +static int _compute_task_count(allocation_info_t *info); +static void _set_nprocs(allocation_info_t *info); +static job_t * _job_create_internal(allocation_info_t *info); +static void _job_fake_cred(job_t *job); +static char * _task_state_name(task_state_t state_inx); +static char * _host_state_name(host_state_t state_inx); + + +/* + * Create an srun job structure from a resource allocation response msg + */ +job_t * +job_create_allocation(resource_allocation_response_msg_t *resp) +{ + job_t *job; + allocation_info_t *info = xmalloc(sizeof(*info)); + + info->nodelist = resp->node_list; + info->nnodes = resp->node_cnt; + info->jobid = resp->job_id; + info->stepid = NO_VAL; + info->num_cpu_groups = resp->num_cpu_groups; + info->cpus_per_node = resp->cpus_per_node; + info->cpu_count_reps = resp->cpu_count_reps; + info->addrs = resp->node_addr; + + job = _job_create_internal(info); + + xfree(info); + + return (job); +} + +/* + * Create an srun job structure w/out an allocation response msg. + * (i.e. use the command line options) + */ +job_t * +job_create_noalloc(void) +{ + job_t *job = NULL; + allocation_info_t *info = xmalloc(sizeof(*info)); + int cpn = 1; + int i; + hostlist_t hl = hostlist_create(opt.nodelist); + + if (!hl) { + error("Invalid node list `%s' specified", opt.nodelist); + goto error; + } + + srand48(getpid()); + info->jobid = (uint32_t) (lrand48() % 65550L + 1L); + info->stepid = 0; + info->nodelist = opt.nodelist; + info->nnodes = hostlist_count(hl); + + /* if (opt.nprocs < info->nnodes) + opt.nprocs = hostlist_count(hl); + */ + hostlist_destroy(hl); + + info->cpus_per_node = &cpn; + info->cpu_count_reps = &opt.nprocs; + info->addrs = NULL; + + /* + * Create job, then fill in host addresses + */ + job = _job_create_internal(info); + + for (i = 0; i < job->nhosts; i++) { + slurm_set_addr ( &job->slurmd_addr[i], + slurm_get_slurmd_port(), + job->host[i] ); + } + + _job_fake_cred(job); + + error: + xfree(info); + return (job); + +} + + +void +update_job_state(job_t *job, job_state_t state) +{ + pthread_mutex_lock(&job->state_mutex); + if (job->state < state) { + job->state = state; + pthread_cond_signal(&job->state_cond); + } + pthread_mutex_unlock(&job->state_mutex); +} + + +void +job_force_termination(job_t *job) +{ + if (mode == MODE_ATTACH) { + info ("forcing detach"); + update_job_state(job, SRUN_JOB_DETACHED); + } else { + info ("forcing job termination"); + update_job_state(job, SRUN_JOB_FORCETERM); + } + + pthread_kill(job->ioid, SIGHUP); +} + + +void job_fatal(job_t *job, const char *msg) +{ + if (msg) error(msg); + + job_destroy(job, errno); + + exit(1); +} + + +void +job_destroy(job_t *job, int error) +{ + if (job->old_job) { + debug("cancelling job step %u.%u", job->jobid, job->stepid); + slurm_complete_job_step(job->jobid, job->stepid, 0, error); + } else if (!opt.no_alloc) { + debug("cancelling job %u", job->jobid); + slurm_complete_job(job->jobid, 0, error); + } else { + debug("no allocation to cancel"); + return; + } + +#ifdef HAVE_TOTALVIEW + if (error) tv_launch_failure(); +#endif +} + + +void +job_kill(job_t *job) +{ + if (!opt.no_alloc) { + if (slurm_kill_job_step(job->jobid, job->stepid, SIGKILL) < 0) + error ("slurm_kill_job_step: %m"); + } + update_job_state(job, SRUN_JOB_FAILED); +} + + +int +job_active_tasks_on_host(job_t *job, int hostid) +{ + int i; + int retval = 0; + + slurm_mutex_lock(&job->task_mutex); + for (i = 0; i < job->ntask[hostid]; i++) { + uint32_t tid = job->tids[hostid][i]; + if (job->task_state[tid] == SRUN_TASK_RUNNING) + retval++; + } + slurm_mutex_unlock(&job->task_mutex); + return retval; +} + +void +report_job_status(job_t *job) +{ + int i; + + for (i = 0; i < job->nhosts; i++) { + info ("host:%s state:%s", job->host[i], + _host_state_name(job->host_state[i])); + } +} + + +#define NTASK_STATES 6 +void +report_task_status(job_t *job) +{ + int i; + char buf[1024]; + hostlist_t hl[NTASK_STATES]; + + for (i = 0; i < NTASK_STATES; i++) + hl[i] = hostlist_create(NULL); + + for (i = 0; i < opt.nprocs; i++) { + int state = job->task_state[i]; + if ((state == SRUN_TASK_EXITED) + && ((job->err[i] >= 0) || (job->out[i] >= 0))) + state = 4; + snprintf(buf, 256, "task%d", i); + hostlist_push(hl[state], buf); + } + + for (i = 0; i< NTASK_STATES; i++) { + if (hostlist_count(hl[i]) > 0) { + hostlist_ranged_string(hl[i], 1022, buf); + info("%s: %s", buf, _task_state_name(i)); + } + hostlist_destroy(hl[i]); + } + +} + + static inline int _estimate_nports(int nclients, int cli_per_port) { @@ -207,28 +430,6 @@ _job_create_internal(allocation_info_t *info) return job; } -job_t * -job_create_allocation(resource_allocation_response_msg_t *resp) -{ - job_t *job; - allocation_info_t *info = xmalloc(sizeof(*info)); - - info->nodelist = resp->node_list; - info->nnodes = resp->node_cnt; - info->jobid = resp->job_id; - info->stepid = NO_VAL; - info->num_cpu_groups = resp->num_cpu_groups; - info->cpus_per_node = resp->cpus_per_node; - info->cpu_count_reps = resp->cpu_count_reps; - info->addrs = resp->node_addr; - - job = _job_create_internal(info); - - xfree(info); - - return (job); -} - static void _job_fake_cred(job_t *job) { @@ -246,129 +447,44 @@ _job_fake_cred(job_t *job) error ("close(/dev/random): %m"); } -job_t * -job_create_noalloc(void) -{ - job_t *job = NULL; - allocation_info_t *info = xmalloc(sizeof(*info)); - int cpn = 1; - int i; - hostlist_t hl = hostlist_create(opt.nodelist); - - if (!hl) { - error("Invalid node list `%s' specified", opt.nodelist); - goto error; - } - - srand48(getpid()); - info->jobid = (uint32_t) (lrand48() % 65550L + 1L); - info->stepid = 0; - info->nodelist = opt.nodelist; - info->nnodes = hostlist_count(hl); - - /* if (opt.nprocs < info->nnodes) - opt.nprocs = hostlist_count(hl); - */ - hostlist_destroy(hl); - - info->cpus_per_node = &cpn; - info->cpu_count_reps = &opt.nprocs; - info->addrs = NULL; - /* - * Create job, then fill in host addresses - */ - job = _job_create_internal(info); - for (i = 0; i < job->nhosts; i++) { - slurm_set_addr ( &job->slurmd_addr[i], - slurm_get_slurmd_port(), - job->host[i] ); - } - - _job_fake_cred(job); - - error: - xfree(info); - return (job); - -} - -void -update_job_state(job_t *job, job_state_t state) +static char * +_task_state_name(task_state_t state_inx) { - pthread_mutex_lock(&job->state_mutex); - if (job->state < state) { - job->state = state; - pthread_cond_signal(&job->state_cond); + switch (state_inx) { + case SRUN_TASK_INIT: + return "initializing"; + case SRUN_TASK_RUNNING: + return "running"; + case SRUN_TASK_FAILED: + return "failed"; + case SRUN_TASK_EXITED: + return "exited"; + case SRUN_TASK_IO_WAIT: + return "waiting for io"; + case SRUN_TASK_ABNORMAL_EXIT: + return "exited abnormally"; + default: + return "unknown"; } - pthread_mutex_unlock(&job->state_mutex); } -void -job_force_termination(job_t *job) +static char * +_host_state_name(host_state_t state_inx) { - if (mode == MODE_ATTACH) { - info ("forcing detach"); - update_job_state(job, SRUN_JOB_DETACHED); - } else { - info ("forcing job termination"); - update_job_state(job, SRUN_JOB_FORCETERM); + switch (state_inx) { + case SRUN_HOST_INIT: + return "initial"; + case SRUN_HOST_CONTACTED: + return "contacted"; + case SRUN_HOST_UNREACHABLE: + return "unreachable"; + case SRUN_HOST_REPLIED: + return "replied"; + default: + return "unknown"; } - - pthread_kill(job->ioid, SIGHUP); } -void job_fatal(job_t *job, const char *msg) -{ - if (msg) error(msg); - - job_destroy(job, errno); - - exit(1); -} -void -job_destroy(job_t *job, int error) -{ - if (job->old_job) { - debug("cancelling job step %u.%u", job->jobid, job->stepid); - slurm_complete_job_step(job->jobid, job->stepid, 0, error); - } else if (!opt.no_alloc) { - debug("cancelling job %u", job->jobid); - slurm_complete_job(job->jobid, 0, error); - } else { - debug("no allocation to cancel"); - return; - } - -#ifdef HAVE_TOTALVIEW - if (error) tv_launch_failure(); -#endif -} - -void -job_kill(job_t *job) -{ - if (!opt.no_alloc) { - if (slurm_kill_job_step(job->jobid, job->stepid, SIGKILL) < 0) - error ("slurm_kill_job_step: %m"); - } - update_job_state(job, SRUN_JOB_FAILED); -} - -int -job_active_tasks_on_host(job_t *job, int hostid) -{ - int i; - int retval = 0; - - slurm_mutex_lock(&job->task_mutex); - for (i = 0; i < job->ntask[hostid]; i++) { - uint32_t tid = job->tids[hostid][i]; - if (job->task_state[tid] == SRUN_TASK_RUNNING) - retval++; - } - slurm_mutex_unlock(&job->task_mutex); - return retval; -} diff --git a/src/srun/job.h b/src/srun/job.h index acda48fc949..101e54e92ab 100644 --- a/src/srun/job.h +++ b/src/srun/job.h @@ -66,7 +66,8 @@ typedef enum { SRUN_TASK_RUNNING, SRUN_TASK_FAILED, SRUN_TASK_IO_WAIT, - SRUN_TASK_EXITED + SRUN_TASK_EXITED, + SRUN_TASK_ABNORMAL_EXIT } task_state_t; @@ -161,4 +162,14 @@ void job_kill(job_t *job); */ int job_active_tasks_on_host(job_t *job, int hostid); +/* + * report current task status + */ +void report_task_status(job_t *job); + +/* + * report current node status + */ +void report_job_status(job_t *job); + #endif /* !_HAVE_JOB_H */ diff --git a/src/srun/msg.c b/src/srun/msg.c index 09515cb76ce..1701d1554e2 100644 --- a/src/srun/msg.c +++ b/src/srun/msg.c @@ -75,7 +75,6 @@ static inline bool _job_msg_done(job_t *job); static void _launch_handler(job_t *job, slurm_msg_t *resp); static void _msg_thr_poll(job_t *job); static void _set_jfds_nonblocking(job_t *job); -static char * _taskid2hostname(int task_id, job_t * job); static void _print_pid_list(const char *host, int ntasks, uint32_t *pid, char *executable_name); @@ -345,28 +344,27 @@ _reattach_handler(job_t *job, slurm_msg_t *msg) static void _exit_handler(job_t *job, slurm_msg_t *exit_msg) { - int i; - task_exit_msg_t *msg = (task_exit_msg_t *) exit_msg->data; + task_exit_msg_t *msg = (task_exit_msg_t *) exit_msg->data; + hostlist_t hl = hostlist_create(NULL); + int status = msg->return_code; + int i; + char buf[1024]; - for (i=0; i<msg->num_tasks; i++) { + for (i = 0; i < msg->num_tasks; i++) { uint32_t taskid = msg->task_id_list[i]; if ((taskid < 0) || (taskid >= opt.nprocs)) { error("task exit resp has bad task id %d", taskid); - return; + continue; } - if (msg->return_code) - verbose("task %d from node %s exited with status %d", - taskid, _taskid2hostname(taskid, job), - msg->return_code); - else - debug("task %d exited with status 0", taskid); + snprintf(buf, sizeof(buf), "task%d", taskid); + hostlist_push(hl, buf); slurm_mutex_lock(&job->task_mutex); - job->tstatus[taskid] = msg->return_code; - if (msg->return_code) - job->task_state[taskid] = SRUN_TASK_FAILED; + job->tstatus[taskid] = status; + if (status) + job->task_state[taskid] = SRUN_TASK_ABNORMAL_EXIT; else { if ( (job->err[taskid] != IO_DONE) || (job->out[taskid] != IO_DONE) ) @@ -382,33 +380,13 @@ _exit_handler(job_t *job, slurm_msg_t *exit_msg) update_job_state(job, SRUN_JOB_TERMINATED); } } -} - -static char * _taskid2hostname (int task_id, job_t * job) -{ - int i, j, id = 0; - - if (opt.distribution == SRUN_DIST_BLOCK) { - for (i=0; ((i<job->nhosts) && (id<opt.nprocs)); i++) { - id += job->ntask[i]; - if (task_id < id) - return job->host[i]; - } - - } else { /* cyclic */ - for (j=0; (id<opt.nprocs); j++) { /* cycle counter */ - for (i=0; ((i<job->nhosts) && (id<opt.nprocs)); i++) { - if (j >= job->cpus[i]) - continue; - if (task_id == (id++)) - return job->host[i]; - } - } - } - return "Unknown"; + hostlist_ranged_string(hl, sizeof(buf), buf); + verbose("%s exited with status %d", buf, status); + hostlist_destroy(hl); } + static void _handle_msg(job_t *job, slurm_msg_t *msg) { -- GitLab