Skip to content
Snippets Groups Projects
Commit aee5312d authored by Moe Jette's avatar Moe Jette
Browse files

Abort job for which launch RPC was received, but no response arrived.

parent e08a6568
No related branches found
No related tags found
No related merge requests found
......@@ -64,16 +64,16 @@ typedef struct fd_info {
time_t time_last_io;
static void _accept_io_stream(job_t *job, int i);
static int _do_task_output_poll(fd_info_t *info);
static int _handle_pollerr(fd_info_t *info);
static int _close_stream(int *fd, FILE *out, int tasknum);
static int _do_task_output(int *fd, FILE *out, int tasknum);
static void _bcast_stdin(int fd, job_t *job);
static ssize_t _readx(int fd, char *buf, size_t maxbytes);
static ssize_t _readn(int fd, void *buf, size_t nbytes);
static char * _next_line(char **str);
static int _validate_header(slurm_io_stream_header_t *hdr, job_t *job);
static void _accept_io_stream(job_t *job, int i);
static void _bcast_stdin(int fd, job_t *job);
static int _close_stream(int *fd, FILE *out, int tasknum);
static int _do_task_output(int *fd, FILE *out, int tasknum);
static int _do_task_output_poll(fd_info_t *info);
static int _handle_pollerr(fd_info_t *info);
static char * _next_line(char **str);
static ssize_t _readn(int fd, void *buf, size_t nbytes);
static ssize_t _readx(int fd, char *buf, size_t maxbytes);
static int _validate_header(slurm_io_stream_header_t *hdr, job_t *job);
#define _poll_set_rd(_pfd, _fd) do { \
(_pfd).fd = _fd; \
......@@ -129,7 +129,7 @@ _io_thr_poll(void *job_arg)
int numfds = (opt.nprocs*2) + job->niofds + 2;
fd_info_t map[numfds]; /* map fd in pollfd array to fd info */
int i, rc;
bool no_io_msg_sent = false;
static bool no_io_msg_sent = false;
xassert(job != NULL);
......@@ -149,6 +149,7 @@ _io_thr_poll(void *job_arg)
_poll_set_rd(fds[i], job->iofd[i]);
}
_poll_set_rd(fds[i], STDIN_FILENO);
time_last_io = time(NULL);
while (1) {
int eofcnt = 0;
......@@ -171,14 +172,14 @@ _io_thr_poll(void *job_arg)
nfds++;
}
if ((job->out[i] == IO_DONE)
&& (job->err[i] == IO_DONE))
if ((job->out[i] == IO_DONE) &&
(job->err[i] == IO_DONE))
eofcnt++;
}
debug3("eofcnt == %d", eofcnt);
/* exit if we have received eof on all streams */
/* exit if we have received EOF on all streams */
if (eofcnt == opt.nprocs)
pthread_exit(0);
......@@ -188,13 +189,13 @@ _io_thr_poll(void *job_arg)
if (job->state == SRUN_JOB_FAILED)
pthread_exit(0);
else if (no_io_msg_sent)
continue;
;
else if (i > MAX_IO_WAIT_SEC) {
info("Warning: No I/O in %d seconds",
MAX_IO_WAIT_SEC);
no_io_msg_sent = true;
continue;
}
continue;
}
switch(errno) {
......@@ -225,13 +226,15 @@ _io_thr_poll(void *job_arg)
if (_poll_rd_isset(fds[i++]))
_bcast_stdin(STDIN_FILENO, job);
for (; i < nfds; i++) {
for ( ; i < nfds; i++) {
unsigned short revents = fds[i].revents;
xassert(!(revents & POLLNVAL));
if (revents & POLLERR || revents & POLLHUP)
if ((revents & POLLERR) ||
(revents & POLLHUP) ||
(revents & POLLNVAL))
_handle_pollerr(&map[i]);
if (revents & POLLIN && *map[i].fd > 0)
if ((revents & POLLIN) && (*map[i].fd > 0))
_do_task_output_poll(&map[i]);
}
}
......@@ -293,7 +296,7 @@ _accept_io_stream(job_t *job, int i)
return;
}
free_buf(buffer); /* NOTE: this frees msgbuf */
if (_validate_header(&hdr, job))
if (_validate_header(&hdr, job)) /* check key */
return;
......@@ -301,21 +304,18 @@ _accept_io_stream(job_t *job, int i)
* sends along some control information
*/
/* Do I even need this? */
fd_set_nonblocking(sd);
if ((hdr.task_id < 0) || (hdr.task_id >= opt.nprocs)) {
error ("Invalid task_id %d from %s",
hdr.task_id, buf);
continue;
}
fd_set_nonblocking(sd);
if (hdr.type == SLURM_IO_STREAM_INOUT)
job->out[hdr.task_id] = sd;
else
job->err[hdr.task_id] = sd;
/* FIXME: Need to check key */
verbose("accepted %s connection from %s task %ld, sd=%d",
(hdr.type ? "stderr" : "stdout"),
buf, hdr.task_id, sd );
......@@ -331,7 +331,7 @@ _close_stream(int *fd, FILE *out, int tasknum)
out == stdout ? "stdout" : "stderr");
fflush(out);
retval = shutdown(*fd, SHUT_RDWR);
if (retval >= 0 || errno != EBADF)
if ((retval >= 0) || (errno != EBADF))
close(*fd);
*fd = IO_DONE;
return retval;
......@@ -370,7 +370,8 @@ _readx(int fd, char *buf, size_t maxbytes)
if ((n = read(fd, (void *) buf, maxbytes)) < 0) {
if (errno == EINTR)
goto again;
if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
if ((errno == EAGAIN) ||
(errno == EWOULDBLOCK))
return 0;
error("readx fd %d: %m", fd, n);
return -1; /* shutdown socket, cleanup. */
......
......@@ -78,7 +78,8 @@ job_create(resource_allocation_response_msg_t *resp)
job->nhosts = hostlist_count(hl);
job->host = (char **) xmalloc(job->nhosts * sizeof(char *));
job->slurmd_addr = (slurm_addr *) xmalloc(job->nhosts * sizeof(slurm_addr));
job->slurmd_addr = (slurm_addr *) xmalloc(job->nhosts *
sizeof(slurm_addr));
job->cpus = (int *) xmalloc(job->nhosts * sizeof(int) );
job->ntask = (int *) xmalloc(job->nhosts * sizeof(int) );
......@@ -116,7 +117,8 @@ job_create(resource_allocation_response_msg_t *resp)
pthread_mutex_init(&job->task_mutex, NULL);
ntask = opt.nprocs;
tph = (ntask+job->nhosts-1) / job->nhosts; /* tasks per host, round up */
tph = (ntask+job->nhosts-1) /
job->nhosts; /* tasks per host, round up */
for(i = 0; i < job->nhosts; i++) {
job->host[i] = hostlist_shift(hl);
......@@ -133,7 +135,8 @@ job_create(resource_allocation_response_msg_t *resp)
cpu_cnt = 0;
}
memcpy(&job->slurmd_addr[i],
&resp->node_addr[i], sizeof(job->slurmd_addr[i]));
&resp->node_addr[i],
sizeof(job->slurmd_addr[i]));
} else {
job->cpus[i] = tph;
slurm_set_addr (&job->slurmd_addr[i],
......
......@@ -33,8 +33,9 @@
#endif
#include <errno.h>
#include <sys/poll.h>
#include <fcntl.h>
#include <sys/poll.h>
#include <time.h>
#include "src/common/fd.h"
#include "src/common/log.h"
......@@ -52,9 +53,21 @@
#include "src/srun/attach.h"
#endif
#define MAX_MSG_WAIT_SEC 60 /* max msg idle, secs, confirm launches */
#define POLL_TIMEOUT_MSEC 500
time_t time_last_msg;
static int tasks_exited = 0;
static uint32_t slurm_user_id;
static void _accept_msg_connection(job_t *job, int fdnum);
static void _confirm_launch_complete(job_t *job);
static void _exit_handler(job_t *job, slurm_msg_t *exit_msg);
static void _handle_msg(job_t *job, slurm_msg_t *msg);
static void _launch_handler(job_t *job, slurm_msg_t *resp);
static void _msg_thr_poll(job_t *job);
static void _set_jfds_nonblocking(job_t *job);
#define _poll_set_rd(_pfd, _fd) do { \
(_pfd).fd = _fd; \
(_pfd).events = POLLIN; \
......@@ -69,7 +82,6 @@ static uint32_t slurm_user_id;
#define _poll_wr_isset(pfd) ((pfd).revents & POLLOUT)
#define _poll_err(pfd) ((pfd).revents & POLLERR)
#define POLL_TIMEOUT_MSEC 500
static void
_launch_handler(job_t *job, slurm_msg_t *resp)
......@@ -108,6 +120,24 @@ _launch_handler(job_t *job, slurm_msg_t *resp)
}
/* _confirm_launch_complete
* confirm that all tasks registers a sucessful launch
* exit on failure */
static void
_confirm_launch_complete(job_t *job)
{
int i;
for (i=0; i<job->nhosts; i++) {
if (job->host_state[i] != SRUN_HOST_REPLIED) {
error ("Node %s not responding, terminiating job step",
job->host[i]);
update_job_state(job, SRUN_JOB_FAILED);
pthread_exit(0);
}
}
}
static void
_exit_handler(job_t *job, slurm_msg_t *exit_msg)
{
......@@ -214,6 +244,7 @@ _msg_thr_poll(job_t *job)
struct pollfd *fds;
nfds_t nfds = job->njfds;
int i, rc;
static bool check_launch_msg_sent = false;
fds = xmalloc(job->njfds * sizeof(*fds));
......@@ -221,14 +252,21 @@ _msg_thr_poll(job_t *job)
for (i = 0; i < job->njfds; i++)
_poll_set_rd(fds[i], job->jfd[i]);
time_last_msg = time(NULL);
while (1) {
while ((rc = poll(fds, nfds, POLL_TIMEOUT_MSEC)) <= 0) {
if (rc == 0) { /* timeout */
i = time(NULL)-time_last_msg;
if (job->state == SRUN_JOB_FAILED)
pthread_exit(0);
else
continue;
else if (check_launch_msg_sent)
;
else if (i > MAX_MSG_WAIT_SEC) {
_confirm_launch_complete(job);
check_launch_msg_sent = true;
}
continue;
}
switch (errno) {
......@@ -243,9 +281,12 @@ _msg_thr_poll(job_t *job)
}
}
time_last_msg = time(NULL);
for (i = 0; i < job->njfds; i++) {
unsigned short revents = fds[i].revents;
if (revents & POLLERR)
if ((revents & POLLERR) ||
(revents & POLLHUP) ||
(revents & POLLNVAL))
error("poll error on jfd %d: %m", fds[i].fd);
else if (revents & POLLIN)
_accept_msg_connection(job, i);
......@@ -264,49 +305,3 @@ msg_thr(void *arg)
_msg_thr_poll(job);
return (void *)1;
}
void *
_msg_thr_one(void *arg)
{
job_t *job = (job_t *) arg;
slurm_fd fd;
slurm_fd newfd;
slurm_msg_t *msg = NULL;
slurm_addr cli_addr;
char addrbuf[256];
xassert(job != NULL);
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
fd = job->jfd[0];
while (1) {
if ((newfd = slurm_accept_msg_conn(fd, &cli_addr)) < 0) {
error("_msg_thr_one/slurm_accept_msg_conn: %m");
break;
}
slurm_print_slurm_addr(&cli_addr, addrbuf, 256);
debug2("got message connection from %s", addrbuf);
msg = xmalloc(sizeof(*msg));
if (slurm_receive_msg(newfd, msg) == SLURM_SOCKET_ERROR) {
error("_msg_thr_one/slurm_receive_msg: %m");
slurm_close_accepted_conn(newfd);
break;
}
msg->conn_fd = newfd;
_handle_msg(job, msg);
slurm_close_accepted_conn(newfd);
}
/* reached only on receive error */
return (void *)(0);
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment