From 5d1332419bd9ab01c65e61223c3e94d64cd81aaf Mon Sep 17 00:00:00 2001 From: Mark Grondona <mgrondona@llnl.gov> Date: Wed, 7 May 2003 00:39:40 +0000 Subject: [PATCH] o Remove all instances of constant poll() timeouts (only timeout if needed) o Move checks for launch or exit timeouts from IO thread into msg.c o Added new "undocumented" options --max-launch-time and --max-exit-timeout for use in testing. o Do not wantonly terminate IO thread when job moves into state TERMINATED, only exit on FORCETERM. (this allows collection of error messages from slurmds) o Small amount of code cleanup in io.c, signals.c --- src/srun/io.c | 207 +++++++++++++++++++-------------------------- src/srun/job.c | 6 ++ src/srun/job.h | 3 + src/srun/launch.c | 6 +- src/srun/msg.c | 145 ++++++++++++++++++++++--------- src/srun/opt.c | 30 +++++-- src/srun/opt.h | 2 + src/srun/signals.c | 92 ++++++++++---------- 8 files changed, 273 insertions(+), 218 deletions(-) diff --git a/src/srun/io.c b/src/srun/io.c index 678caf1c3c8..69d1b7973c2 100644 --- a/src/srun/io.c +++ b/src/srun/io.c @@ -55,12 +55,6 @@ #include "src/srun/net.h" #include "src/srun/opt.h" -#define IO_BUFSIZ 2048 -#define MAX_TERM_WAIT_SEC 60 /* max time since first task - * terminated, secs, warning msg */ -#define POLL_TIMEOUT_MSEC 500 /* max wait for i/o poll, msec */ - -static time_t time_first_done = 0; static int fmt_width = 0; /* fd_info struct used in poll() loop to map fds back to task number, @@ -76,7 +70,6 @@ typedef struct fd_info { static void _accept_io_stream(job_t *job, int i); static void _bcast_stdin(int fd, job_t *job); static int _close_stream(int *fd, FILE *out, int tasknum); -static void _do_poll_timeout(job_t *job); static int _do_task_output(int *fd, FILE *out, cbuf_t buf, int tasknum); static int _do_task_output_poll(fd_info_t *info); static int _do_task_input(job_t *job, int taskid); @@ -160,7 +153,7 @@ _set_iofds_nonblocking(job_t *job) } static void -_update_task_state(job_t *job, int taskid) +_update_task_io_state(job_t *job, int taskid) { slurm_mutex_lock(&job->task_mutex); if (job->task_state[taskid] == SRUN_TASK_IO_WAIT) @@ -234,28 +227,17 @@ _flush_io(job_t *job) debug3("Read %dB from tasks, wrote %dB", nbytes, nwritten); } -static void * -_io_thr_poll(void *job_arg) +static void +_io_thr_init(job_t *job, struct pollfd *fds) { - job_t *job = (job_t *) job_arg; - struct pollfd *fds; - nfds_t nfds = 0; - int numfds = (opt.nprocs*2) + job->niofds + 2; - fd_info_t map[numfds]; /* map fd in pollfd array to fd info */ - int i, rc, out_fd_state, err_fd_state; + int out_fd_state = WAITING_FOR_IO; + int err_fd_state = WAITING_FOR_IO; + int i; xassert(job != NULL); - debug3("IO thread pid = %ld", (long) getpid()); - - /* need ioport + msgport + stdin + 2*nprocs fds */ - fds = xmalloc(numfds*sizeof(*fds)); - _set_iofds_nonblocking(job); - out_fd_state = WAITING_FOR_IO; - err_fd_state = WAITING_FOR_IO; - if (job->ofname->type == IO_ALL) out_fd_state = WAITING_FOR_IO; else { @@ -281,74 +263,103 @@ _io_thr_poll(void *job_arg) for (i = 0; i < job->niofds; i++) _poll_set_rd(fds[i], job->iofd[i]); - while (!_io_thr_done(job)) { - int eofcnt = 0; - nfds = job->niofds; /* already have n ioport fds + stdin */ +} - if ((job->stdinfd >= 0) && stdin_open) { - _poll_set_rd(fds[nfds], job->stdinfd); - nfds++; +static void +_fd_info_init(fd_info_t *info, int taskid, int *pfd, FILE *fp, cbuf_t buf) +{ + info->taskid = taskid; + info->fd = pfd; + info->fp = fp; + info->buf = buf; +} + +static nfds_t +_setup_pollfds(job_t *job, struct pollfd *fds, fd_info_t *map) +{ + int eofcnt = 0; + int i; + nfds_t nfds = job->niofds; /* already have n ioport fds + stdin */ + + if ((job->stdinfd >= 0) && stdin_open) { + _poll_set_rd(fds[nfds], job->stdinfd); + nfds++; + } + + for (i = 0; i < opt.nprocs; i++) { + + if (job->task_state[i] == SRUN_TASK_FAILED) { + job->out[i] = IO_DONE; + if (job->err[i] == WAITING_FOR_IO) + job->err[i] = IO_DONE; } + if (job->out[i] >= 0) { - for (i = 0; i < opt.nprocs; i++) { - if (job->out[i] >= 0) { - _poll_set_rd(fds[nfds], job->out[i]); - - if ( (cbuf_used(job->inbuf[i]) > 0) - || (stdin_got_eof && !job->stdin_eof[i])) - fds[nfds].events |= POLLOUT; - - map[nfds].taskid = i; - map[nfds].fd = &job->out[i]; - map[nfds].fp = job->outstream; - map[nfds].buf = job->outbuf[i]; - nfds++; - } + _poll_set_rd(fds[nfds], job->out[i]); - if (job->err[i] >= 0) { - _poll_set_rd(fds[nfds], job->err[i]); - map[nfds].taskid = i; - map[nfds].fd = &job->err[i]; - map[nfds].fp = job->errstream; - map[nfds].buf = job->errbuf[i]; - nfds++; - } + if ( (cbuf_used(job->inbuf[i]) > 0) + || (stdin_got_eof && !job->stdin_eof[i])) + fds[nfds].events |= POLLOUT; - if ( (job->out[i] == IO_DONE) - && (job->err[i] == IO_DONE) ) { - eofcnt++; - _update_task_state(job, i); - } + _fd_info_init( map + nfds, i, &job->out[i], + job->outstream, job->outbuf[i] ); + nfds++; + } + if (job->err[i] >= 0) { + _poll_set_rd(fds[nfds], job->err[i]); + + _fd_info_init( map + nfds, i, &job->err[i], + job->errstream, job->errbuf[i] ); + nfds++; } - /* exit if we have received EOF on all streams */ - if (eofcnt) { - if (eofcnt == opt.nprocs) { - debug("got EOF on all streams"); - _flush_io(job); - pthread_exit(0); - } - - if (time_first_done == 0) - time_first_done = time(NULL); + + if ( (job->out[i] == IO_DONE) + && (job->err[i] == IO_DONE) ) { + eofcnt++; + _update_task_io_state(job, i); } - if (job->state == SRUN_JOB_FAILED) { - debug3("job state is failed"); + + } + + /* exit if we have received EOF on all streams */ + if (eofcnt) { + if (eofcnt == opt.nprocs) { + debug("got EOF on all streams"); _flush_io(job); pthread_exit(0); - } + } + } - while ((!_io_thr_done(job)) && - ((rc = poll(fds, nfds, POLL_TIMEOUT_MSEC)) <= 0)) { + return nfds; +} - if (rc == 0) { /* timeout */ - _do_poll_timeout(job); - continue; - } +static void * +_io_thr_poll(void *job_arg) +{ + int i, rc; + job_t *job = (job_t *) job_arg; + int numfds = (opt.nprocs*2) + job->niofds + 2; + nfds_t nfds = 0; + struct pollfd fds[numfds]; + fd_info_t map[numfds]; /* map fd in pollfd array to fd info */ + + xassert(job != NULL); + + debug3("IO thread pid = %ld", (long) getpid()); + + _io_thr_init(job, fds); + + while (!_io_thr_done(job)) { + + nfds = _setup_pollfds(job, fds, map); + + while ((!_io_thr_done(job)) && + ((rc = poll(fds, nfds, -1)) <= 0)) { switch(errno) { case EINTR: @@ -395,52 +406,12 @@ _io_thr_poll(void *job_arg) _do_task_input_poll(job, &map[i]); } } - xfree(fds); - return NULL; -} - -static void _do_poll_timeout (job_t *job) -{ - int i, age, eofcnt = 0; - static bool term_msg_sent = false; - - - for (i = 0; ((i < opt.nprocs) && (time_first_done == 0)); i++) { - if (job->task_state[i] >= SRUN_TASK_FAILED) { - time_first_done = time(NULL); - break; - } - } - for (i = 0; i < opt.nprocs; i++) { - if ((job->err[i] == IO_DONE) && (job->out[i] == IO_DONE)) - eofcnt++; - } + debug("IO thread exiting"); - if (eofcnt == opt.nprocs) { - debug("In poll_timeout(): EOF on all streams"); - _flush_io(job); - pthread_exit(0); - } - - age = time(NULL) - time_first_done; - if (job->state == SRUN_JOB_FAILED) { - debug3("job failed, exiting IO thread"); - pthread_exit(0); - } else if (time_first_done && opt.max_wait && (age >= opt.max_wait)) { - error("First task exited %d second%s ago", - age, age > 1 ? "s" : "" ); - report_task_status(job); - update_job_state(job, SRUN_JOB_FAILED); - } else if (time_first_done && (term_msg_sent == false) && - (age >= MAX_TERM_WAIT_SEC)) { - info("Warning: First task terminated %d second%s ago", - age, age > 1 ? "s" : ""); - term_msg_sent = true; - } + return NULL; } - static inline bool _io_thr_done(job_t *job) { diff --git a/src/srun/job.c b/src/srun/job.c index 57980368546..0d9f0dffefc 100644 --- a/src/srun/job.c +++ b/src/srun/job.c @@ -365,6 +365,12 @@ _job_create_internal(allocation_info_t *info) job->stepid = info->stepid; job->old_job = false; + /* + * Initialize Launch and Exit timeout values + */ + job->ltimeout = 0; + job->etimeout = 0; + job->slurmd_addr = xmalloc(job->nhosts * sizeof(slurm_addr)); if (info->addrs) memcpy( job->slurmd_addr, info->addrs, diff --git a/src/srun/job.h b/src/srun/job.h index cd5910e0d47..141742e5270 100644 --- a/src/srun/job.h +++ b/src/srun/job.h @@ -118,6 +118,9 @@ typedef struct srun_job { pthread_t lid; /* launch thread id */ + time_t ltimeout; /* Time by which all tasks must be running */ + time_t etimeout; /* exit timeout (see opt.max_wait */ + host_state_t *host_state; /* nhost host states */ int *tstatus; /* ntask exit statii */ diff --git a/src/srun/launch.c b/src/srun/launch.c index eafe58e7627..796bfdec739 100644 --- a/src/srun/launch.c +++ b/src/srun/launch.c @@ -206,8 +206,8 @@ launch(void *arg) xfree(req_array_ptr); if (fail_launch_cnt) { - error("%d task launch requests failed, terminating job step", - fail_launch_cnt); + error("%d launch request%s failed, terminating job step", + fail_launch_cnt, fail_launch_cnt > 1 ? "s" : ""); job->rc = 124; job_kill(job); } else { @@ -253,6 +253,8 @@ static void _p_launch(slurm_msg_t *req_array_ptr, job_t *job) PTHREAD_SCOPE_SYSTEM)) error ("pthread_attr_setscope error %m"); #endif + job->ltimeout = time(NULL) + opt.max_launch_time; + if ( pthread_create ( &thread_ptr[i].thread, &thread_ptr[i].attr, _p_launch_task, diff --git a/src/srun/msg.c b/src/srun/msg.c index e5bc376810a..a5205487bf9 100644 --- a/src/srun/msg.c +++ b/src/srun/msg.c @@ -1,9 +1,10 @@ /****************************************************************************\ * msg.c - process message traffic between srun and slurm daemons + * $Id$ ***************************************************************************** * Copyright (C) 2002 The Regents of the University of California. * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). - * Written by Mark Grondona <grondona@llnl.gov>, et. al. + * Written by Mark Grondona <mgrondona@llnl.gov>, et. al. * UCRL-CODE-2002-040. * * This file is part of SLURM, a resource management program. @@ -62,19 +63,22 @@ #endif #define LAUNCH_WAIT_SEC 60 /* max wait to confirm launches, sec */ -#define POLL_TIMEOUT_MSEC 500 -static time_t time_first_launch = 0; +static int tasks_exited = 0; +static uid_t slurm_uid; -static int tasks_exited = 0; -static uid_t slurm_uid; +/* + * Static prototypes + */ static void _accept_msg_connection(job_t *job, int fdnum); static void _confirm_launch_complete(job_t *job); static void _exit_handler(job_t *job, slurm_msg_t *exit_msg); static void _handle_msg(job_t *job, slurm_msg_t *msg); static inline bool _job_msg_done(job_t *job); static void _launch_handler(job_t *job, slurm_msg_t *resp); +static void _do_poll_timeout(job_t *job); +static int _get_next_timeout(job_t *job); static void _msg_thr_poll(job_t *job); static void _set_jfds_nonblocking(job_t *job); static void _print_pid_list(const char *host, int ntasks, @@ -224,10 +228,6 @@ update_failed_tasks(job_t *job, uint32_t nodeid) slurm_mutex_lock(&job->task_mutex); for (i = 0; i < job->ntask[nodeid]; i++) { uint32_t tid = job->tids[nodeid][i]; - if (job->err[tid] == WAITING_FOR_IO) - job->err[tid] = IO_DONE; - if (job->out[tid] == WAITING_FOR_IO) - job->out[tid] = IO_DONE; job->task_state[tid] = SRUN_TASK_FAILED; tasks_exited++; } @@ -237,7 +237,6 @@ update_failed_tasks(job_t *job, uint32_t nodeid) debug2("all tasks exited"); update_job_state(job, SRUN_JOB_TERMINATED); } - } static void @@ -257,11 +256,15 @@ _launch_handler(job_t *job, slurm_msg_t *resp) job->host_state[msg->srun_node_id] = SRUN_HOST_REPLIED; slurm_mutex_unlock(&job->task_mutex); + update_failed_tasks(job, msg->srun_node_id); + + /* if (!opt.no_kill) { job->rc = 124; - update_job_state(job, SRUN_JOB_FAILED); + update_job_state(job, SRUN_JOB_WAITING_ON_IO); } else update_failed_tasks(job, msg->srun_node_id); + */ #ifdef HAVE_TOTALVIEW tv_launch_failure(); #endif @@ -289,6 +292,11 @@ _confirm_launch_complete(job_t *job) pthread_exit(0); } } + + /* + * Reset launch timeout so timer will no longer go off + */ + job->ltimeout = 0; } static void @@ -418,6 +426,9 @@ _exit_handler(job_t *job, slurm_msg_t *exit_msg) int i; char buf[1024]; + if (!job->etimeout && !tasks_exited) + job->etimeout = time(NULL) + opt.max_exit_timeout; + for (i = 0; i < msg->num_tasks; i++) { uint32_t taskid = msg->task_id_list[i]; @@ -504,7 +515,7 @@ _accept_msg_connection(job_t *job, int fdnum) short port; if ((fd = slurm_accept_msg_conn(job->jfd[fdnum], &cli_addr)) < 0) { - error("_accept_msg_connection/slurm_accept_msg_conn: %m"); + error("Unable to accept connection: %m"); return; } @@ -516,8 +527,7 @@ _accept_msg_connection(job_t *job, int fdnum) if (slurm_receive_msg(fd, msg) == SLURM_SOCKET_ERROR) { if (errno == EINTR) goto again; - error("_accept_msg_connection/slurm_receive_msg(%s): %m", - host); + error("slurm_receive_msg[%s]: %m", host); xfree(msg); } else { @@ -537,13 +547,83 @@ _set_jfds_nonblocking(job_t *job) fd_set_nonblocking(job->jfd[i]); } +/* + * Call poll() with a timeout. (timeout argument is in seconds) + */ +static int +_do_poll(job_t *job, struct pollfd *fds, int timeout) +{ + nfds_t nfds = job->njfds; + int rc; + + while ((rc = poll(fds, nfds, timeout * 1000)) < 0) { + switch (errno) { + case EINTR: continue; + case ENOMEM: + case EFAULT: fatal("poll: %m"); + default: error("poll: %m. Continuing..."); + continue; + } + } + + return rc; +} + + +/* + * Get the next timeout in seconds from now. + */ +static int +_get_next_timeout(job_t *job) +{ + int timeout = -1; + + if (!job->ltimeout && !job->etimeout) + return -1; + + if (!job->ltimeout) + timeout = job->etimeout - time(NULL); + else if (!job->etimeout) + timeout = job->ltimeout - time(NULL); + else + timeout = job->ltimeout < job->etimeout ? + job->ltimeout - time(NULL) : + job->etimeout - time(NULL); + + return timeout; +} + +/* + * Handle the two poll timeout cases: + * 1. Job launch timed out + * 2. Exit timeout has expired (either print a message or kill job) + */ +static void +_do_poll_timeout(job_t *job) +{ + time_t now = time(NULL); + + if ((job->ltimeout > 0) && (job->ltimeout <= now)) + _confirm_launch_complete(job); + + if ((job->etimeout > 0) && (job->etimeout <= now)) { + if (!opt.max_wait) + info("Warning: first task terminated %ds ago", + opt.max_exit_timeout); + else { + error("First task exited %ds ago", opt.max_wait); + report_task_status(job); + update_job_state(job, SRUN_JOB_FAILED); + } + job->etimeout = 0; + } +} + static void _msg_thr_poll(job_t *job) { struct pollfd *fds; - nfds_t nfds = job->njfds; - int i, rc; - static bool check_launch_msg_sent = false; + int i; fds = xmalloc(job->njfds * sizeof(*fds)); @@ -551,32 +631,12 @@ _msg_thr_poll(job_t *job) for (i = 0; i < job->njfds; i++) _poll_set_rd(fds[i], job->jfd[i]); - time_first_launch = time(NULL); while (!_job_msg_done(job)) { - while ((!_job_msg_done(job)) && - ((rc = poll(fds, nfds, POLL_TIMEOUT_MSEC)) <= 0)) { - if (rc == 0) { /* timeout */ - if (check_launch_msg_sent) - ; - else if ((time(NULL) - time_first_launch) > - LAUNCH_WAIT_SEC) { - _confirm_launch_complete(job); - check_launch_msg_sent = true; - } - continue; - } - - switch (errno) { - case EINTR: continue; - case ENOMEM: - case EFAULT: - fatal("poll: %m"); - break; - default: - error("poll: %m. trying again"); - break; - } + + if (_do_poll(job, fds, _get_next_timeout(job)) == 0) { + _do_poll_timeout(job); + continue; } for (i = 0; i < job->njfds; i++) { @@ -598,8 +658,11 @@ msg_thr(void *arg) job_t *job = (job_t *) arg; debug3("msg thread pid = %ld", (long) getpid()); + slurm_uid = (uid_t) slurm_get_slurm_user_id(); + _msg_thr_poll(job); + return (void *)1; } diff --git a/src/srun/opt.c b/src/srun/opt.c index 43e4c2c3436..e5d7e4f61de 100644 --- a/src/srun/opt.c +++ b/src/srun/opt.c @@ -238,6 +238,10 @@ struct poptOption runTable[] = { {"wait", 'W', POPT_ARG_INT, &opt.max_wait, OPT_WAIT, "seconds to wait after first task ends before killing job", "sec"}, + {"max-launch-time", '\0', POPT_ARG_INT | POPT_ARGFLAG_DOC_HIDDEN, + &opt.max_launch_time, 0, NULL, NULL }, + {"max-exit-timeout", '\0', POPT_ARG_INT | POPT_ARGFLAG_DOC_HIDDEN, + &opt.max_exit_timeout, 0, NULL, NULL }, POPT_TABLEEND }; @@ -581,15 +585,17 @@ static void _opt_default() opt.slurmd_debug = LOG_LEVEL_QUIET; /* constraint default (-1 is no constraint) */ - opt.mincpus = -1; - opt.realmem = -1; - opt.tmpdisk = -1; - - opt.hold = false; - opt.constraints = NULL; - opt.contiguous = false; - opt.nodelist = NULL; - opt.exc_nodes = NULL; + opt.mincpus = -1; + opt.realmem = -1; + opt.tmpdisk = -1; + + opt.hold = false; + opt.constraints = NULL; + opt.contiguous = false; + opt.nodelist = NULL; + opt.exc_nodes = NULL; + opt.max_launch_time = 60; /* 60 seconds to launch job */ + opt.max_exit_timeout= 60; /* Warn user 60 seconds after task exit */ mode = MODE_NORMAL; @@ -1007,6 +1013,12 @@ _opt_verify(poptContext optctx) exit(1); } + /* + * --wait always overrides hidden max_exit_timeout + */ + if (opt.max_wait) + opt.max_exit_timeout = opt.max_wait; + return verified; } diff --git a/src/srun/opt.h b/src/srun/opt.h index b53aec96e97..b17aac9da3a 100644 --- a/src/srun/opt.h +++ b/src/srun/opt.h @@ -148,6 +148,8 @@ typedef struct srun_options { char *nodelist; /* --nodelist=node1,node2,... */ char *exc_nodes; /* --exclude=node1,node2,... -x */ bool no_alloc; /* --no-allocate, -Z */ + int max_launch_time; /* Undocumented */ + int max_exit_timeout; /* Undocumented */ } opt_t; diff --git a/src/srun/signals.c b/src/srun/signals.c index 08bcfbe304e..c7e8bf09691 100644 --- a/src/srun/signals.c +++ b/src/srun/signals.c @@ -134,10 +134,9 @@ void fwd_signal(job_t *job, int signo) { int i; - slurm_msg_t *req_array_ptr; + slurm_msg_t *req; kill_tasks_msg_t msg; - if (signo == SIGKILL || signo == SIGINT || signo == SIGTERM) { slurm_mutex_lock(&job->state_mutex); job->signaled = true; @@ -151,7 +150,7 @@ fwd_signal(job_t *job, int signo) msg.job_step_id = job->stepid; msg.signal = (uint32_t) signo; - req_array_ptr = xmalloc(sizeof(slurm_msg_t) * job->nhosts); + req = xmalloc(sizeof(slurm_msg_t) * job->nhosts); for (i = 0; i < job->nhosts; i++) { if (job->host_state[i] != SRUN_HOST_REPLIED) { @@ -162,16 +161,16 @@ fwd_signal(job_t *job, int signo) if (job_active_tasks_on_host(job, i) == 0) continue; - req_array_ptr[i].msg_type = REQUEST_KILL_TASKS; - req_array_ptr[i].data = &msg; - memcpy( &req_array_ptr[i].address, + req[i].msg_type = REQUEST_KILL_TASKS; + req[i].data = &msg; + memcpy( &req[i].address, &job->slurmd_addr[i], sizeof(slurm_addr)); } - _p_fwd_signal(req_array_ptr, job); + _p_fwd_signal(req, job); debug2("All tasks have been signalled"); - xfree(req_array_ptr); + xfree(req); } @@ -268,15 +267,15 @@ _sig_thr(void *arg) } /* _p_fwd_signal - parallel (multi-threaded) task signaller */ -static void _p_fwd_signal(slurm_msg_t *req_array_ptr, job_t *job) +static void _p_fwd_signal(slurm_msg_t *req, job_t *job) { int i; - task_info_t *task_info_ptr; - thd_t *thread_ptr; + task_info_t *tinfo; + thd_t *thd; - thread_ptr = xmalloc (job->nhosts * sizeof (thd_t)); + thd = xmalloc(job->nhosts * sizeof (thd_t)); for (i = 0; i < job->nhosts; i++) { - if (req_array_ptr[i].msg_type == 0) + if (req[i].msg_type == 0) continue; /* inactive task */ slurm_mutex_lock(&active_mutex); @@ -286,28 +285,26 @@ static void _p_fwd_signal(slurm_msg_t *req_array_ptr, job_t *job) active++; slurm_mutex_unlock(&active_mutex); - task_info_ptr = (task_info_t *)xmalloc(sizeof(task_info_t)); - task_info_ptr->req_ptr = &req_array_ptr[i]; - task_info_ptr->job_ptr = job; - task_info_ptr->host_inx = i; + tinfo = (task_info_t *)xmalloc(sizeof(task_info_t)); + tinfo->req_ptr = &req[i]; + tinfo->job_ptr = job; + tinfo->host_inx = i; + + if ((errno = pthread_attr_init(&thd[i].attr))) + error("pthread_attr_init failed"); + + if (pthread_attr_setdetachstate(&thd[i].attr, + PTHREAD_CREATE_DETACHED)) + error ("pthread_attr_setdetachstate failed"); - if (pthread_attr_init (&thread_ptr[i].attr)) - error ("pthread_attr_init error %m"); - if (pthread_attr_setdetachstate (&thread_ptr[i].attr, - PTHREAD_CREATE_DETACHED)) - error ("pthread_attr_setdetachstate error %m"); #ifdef PTHREAD_SCOPE_SYSTEM - if (pthread_attr_setscope (&thread_ptr[i].attr, - PTHREAD_SCOPE_SYSTEM)) - error ("pthread_attr_setscope error %m"); + if (pthread_attr_setscope(&thd[i].attr, PTHREAD_SCOPE_SYSTEM)) + error ("pthread_attr_setscope failed"); #endif - while ( pthread_create (&thread_ptr[i].thread, - &thread_ptr[i].attr, - _p_signal_task, - (void *) task_info_ptr) ) { - error ("pthread_create error %m"); - /* just run it under this thread */ - _p_signal_task((void *) task_info_ptr); + if (pthread_create( &thd[i].thread, &thd[i].attr, + _p_signal_task, (void *) tinfo )) { + error ("pthread_create failed"); + _p_signal_task((void *) tinfo); } } @@ -317,33 +314,32 @@ static void _p_fwd_signal(slurm_msg_t *req_array_ptr, job_t *job) pthread_cond_wait(&active_cond, &active_mutex); } slurm_mutex_unlock(&active_mutex); - xfree(thread_ptr); + xfree(thd); } /* _p_signal_task - parallelized signal of a specific task */ static void * _p_signal_task(void *args) { + int rc = SLURM_SUCCESS; task_info_t *info = (task_info_t *)args; slurm_msg_t *req = info->req_ptr; job_t *job = info->job_ptr; - int host_inx = info->host_inx; - slurm_msg_t resp; - - debug3("sending signal to host %s", job->host[host_inx]); - if (slurm_send_recv_node_msg(req, &resp) < 0) /* Has timeout */ - error("signal %s: %m", job->host[host_inx]); - else { - return_code_msg_t *rc = resp.data; - if (rc->return_code != 0) { - error("%s: Unable to fwd signal: %s", - job->host[host_inx], - slurm_strerror(rc->return_code)); - } + char *host = job->host[info->host_inx]; - if (resp.msg_type == RESPONSE_SLURM_RC) - slurm_free_return_code_msg(resp.data); + debug3("sending signal to host %s", host); + if (slurm_send_recv_rc_msg(req, &rc) < 0) { + error("%s: signal: %m", host); + goto done; } + /* + * Report error unless it is "Invalid job id" which + * probably just means the tasks exited in the meanwhile. + */ + if ((rc != 0) && (rc != ESLURM_INVALID_JOB_ID)) + error("%s: signal: %s", host, slurm_strerror(rc)); + + done: slurm_mutex_lock(&active_mutex); active--; pthread_cond_signal(&active_cond); -- GitLab