diff --git a/src/srun/io.c b/src/srun/io.c index a94733f29a8dd911f930514a8a24dc1b33d5795e..926d607cca793dfe16d73be1c5d15c46c66899f6 100644 --- a/src/srun/io.c +++ b/src/srun/io.c @@ -34,9 +34,13 @@ void *io_thr(void *job_arg) fd_set rset, wset; int maxfd; int i, m; + struct timeval tv; xassert(job != NULL); + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); + pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL); + if (fcntl(job->iofd, F_SETFL, O_NONBLOCK) < 0) error("Unable to set nonblocking I/O on fd\n"); @@ -62,7 +66,9 @@ void *io_thr(void *job_arg) FD_SET(job->err[i], &rset); } - while ((m = select(maxfd+1, &rset, NULL, NULL, NULL)) < 0) { + tv.tv_sec = 0; + tv.tv_usec = 500; + while ((m = select(maxfd+1, &rset, NULL, NULL, &tv)) < 0) { if (errno != EINTR) fatal("Unable to handle I/O: %m", errno); } diff --git a/src/srun/job.c b/src/srun/job.c index f9b761b5acff391c1452416b987aeb2c5a553167..9ea19368f7b2ac6655870066cdffd4c665cc2a9c 100644 --- a/src/srun/job.c +++ b/src/srun/job.c @@ -22,6 +22,10 @@ job_create(resource_allocation_response_msg_t *resp) job_t *job = (job_t *) xmalloc(sizeof(*job)); + pthread_mutex_init(&job->state_mutex, NULL); + pthread_cond_init(&job->state_cond, NULL); + job->state = SRUN_JOB_INIT; + if (resp != NULL) { job->nodelist = xstrdup(resp->node_list); hl = hostlist_create(resp->node_list); @@ -54,6 +58,13 @@ job_create(resource_allocation_response_msg_t *resp) job->out = (int *) xmalloc(opt.nprocs * sizeof(int) ); job->err = (int *) xmalloc(opt.nprocs * sizeof(int) ); + /* ntask job states and statii */ + job->task_status = (int *) xmalloc(opt.nprocs * sizeof(int)); + job->task_state = + (task_state_t *) xmalloc(opt.nprocs * sizeof(task_state_t)); + + pthread_mutex_init(&job->task_mutex, NULL); + ntask = opt.nprocs; tph = ntask / job->nhosts; /* expect trucation of result here */ @@ -72,7 +83,5 @@ job_create(resource_allocation_response_msg_t *resp) job->ntask[i] = (ntask - tph) > 0 ? tph : ntask; } - job->lastfd = -1; - return job; } diff --git a/src/srun/job.h b/src/srun/job.h index a932851c33aea9182fd1f7584c0f00a2695842dc..98c605efd3190a32d48626202115236b02065d1a 100644 --- a/src/srun/job.h +++ b/src/srun/job.h @@ -10,11 +10,31 @@ #include <src/common/macros.h> #include <src/api/slurm.h> -typedef struct in_addr IA; +typedef enum { + SRUN_JOB_INIT = 0, + SRUN_JOB_LAUNCHING, + SRUN_JOB_STARTING, + SRUN_JOB_RUNNING, + SRUN_JOB_TERMINATING, + SRUN_JOB_OVERDONE +} job_state_t; + +typedef enum { + SRUN_TASK_INIT = 0, + SRUN_TASK_RUNNING, + SRUN_TASK_FAILED, + SRUN_TASK_EXITED +} task_state_t; + typedef struct srun_job { - uint32_t jobid; /* assigned job id */ - uint32_t stepid; /* assigned step id */ + uint32_t jobid; /* assigned job id */ + uint32_t stepid; /* assigned step id */ + + job_state_t state; /* job state */ + pthread_mutex_t state_mutex; + pthread_cond_t state_cond; + slurm_job_credential_t *cred; char *nodelist; /* nodelist in string form */ int nhosts; @@ -22,6 +42,8 @@ typedef struct srun_job { int *ntask; /* number of tasks to run on each host*/ uint32_t *iaddr; /* in_addr vector */ + pthread_t sigid; /* signals thread tid */ + pthread_t jtid; /* job control thread id */ slurm_fd jfd; /* job control info fd */ slurm_addr jaddr; /* job control info port */ @@ -33,7 +55,10 @@ typedef struct srun_job { int *out; /* ntask stdout fds */ int *err; /* ntask stderr fds */ - int lastfd; /* temporary help for assigning io fds */ + int *task_status; /* ntask status (return codes) */ + task_state_t *task_state; /* ntask task states */ + pthread_mutex_t task_mutex; + } job_t; job_t * job_create(resource_allocation_response_msg_t *resp); diff --git a/src/srun/launch.c b/src/srun/launch.c index b9cddab7059037c2de9668f805cbcfebf07a8bf6..9b188ffcb7a5d890d759817f687796d79020bd23 100644 --- a/src/srun/launch.c +++ b/src/srun/launch.c @@ -14,19 +14,22 @@ #include "job.h" #include "opt.h" +extern char **environ; /* number of active threads */ +/* static pthread_mutex_t active_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t active_cond = PTHREAD_COND_INITIALIZER; static int active = 0; +static int timeout; +*/ /* array of nnodes launch threads initialize in launch() */ -static launch_thr_t *thr; - -static int timeout; +/* static launch_thr_t *thr; */ static void print_launch_msg(launch_tasks_request_msg_t *msg); +static int envcount(char **env); void * launch(void *arg) @@ -37,6 +40,11 @@ launch(void *arg) int i, j, taskid; char hostname[MAXHOSTNAMELEN]; + pthread_mutex_lock(&job->state_mutex); + job->state = SRUN_JOB_LAUNCHING; + pthread_cond_signal(&job->state_cond); + pthread_mutex_unlock(&job->state_mutex); + if (read_slurm_port_config() < 0) error("read_slurm_port_config: %d", slurm_strerror(errno)); @@ -56,8 +64,8 @@ launch(void *arg) msg.argv = remote_argv; msg.credential = job->cred; msg.job_step_id = 0; - msg.envc = 0; - msg.env = NULL; + msg.envc = envcount(environ); + msg.env = environ; msg.cwd = opt.cwd; slurm_set_addr_char(&msg.response_addr, ntohs(job->jaddr.sin_port), hostname); @@ -84,7 +92,13 @@ launch(void *arg) xfree(msg.global_task_ids); } - return 1; + pthread_mutex_lock(&job->state_mutex); + job->state = SRUN_JOB_STARTING; + pthread_cond_signal(&job->state_cond); + pthread_mutex_unlock(&job->state_mutex); + + + return(void *)(0); } @@ -101,3 +115,12 @@ static void print_launch_msg(launch_tasks_request_msg_t *msg) for (i = 0; i < msg->tasks_to_launch; i++) debug("global_task_id[%d] = %d\n", i, msg->global_task_ids[i]); } + +static int +envcount(char **environ) +{ + int envc = 0; + while (environ[envc] != NULL) + envc++; + return envc; +} diff --git a/src/srun/msg.c b/src/srun/msg.c index d43d61e5ea69c130a52547e3da269a992a802fec..ef2ca5aab1cbcc64271596ec454688ec450ac79e 100644 --- a/src/srun/msg.c +++ b/src/srun/msg.c @@ -8,25 +8,54 @@ #include <src/common/xassert.h> #include "job.h" +#include "opt.h" + +static int tasks_exited = 0; + +static void +_launch_handler(job_t *job, slurm_msg_t *resp) +{ + launch_tasks_response_msg_t *msg = + (launch_tasks_response_msg_t *) resp->data; + + if (msg->return_code != 0) { + error("recvd return code %d from %s", msg->return_code, + msg->node_name); + return; + } +} static void -_exit_handler(slurm_msg_t *exit_msg) +_exit_handler(job_t *job, slurm_msg_t *exit_msg) { task_exit_msg_t *msg = (task_exit_msg_t *) exit_msg->data; - verbose("task %d exited with status %d", msg->task_id, msg->return_code); + debug2("task %d exited with status %d", msg->task_id, msg->return_code); + pthread_mutex_lock(&job->task_mutex); + job->task_status[msg->task_id] = msg->return_code; + job->task_state[msg->task_id] = SRUN_TASK_EXITED; + pthread_mutex_unlock(&job->task_mutex); + + if (++tasks_exited == opt.nprocs) { + debug2("all tasks exited"); + pthread_mutex_lock(&job->state_mutex); + job->state = SRUN_JOB_OVERDONE; + pthread_cond_signal(&job->state_cond); + pthread_mutex_unlock(&job->state_mutex); + } + } static void -_handle_msg(slurm_msg_t *msg) +_handle_msg(job_t *job, slurm_msg_t *msg) { switch (msg->msg_type) { case RESPONSE_LAUNCH_TASKS: debug("recvd launch tasks response\n"); + _launch_handler(job, msg); break; case MESSAGE_TASK_EXIT: - debug("task exited\n"); - _exit_handler(msg); + _exit_handler(job, msg); break; case RESPONSE_REATTACH_TASKS_STREAMS: debug("recvd reattach response\n"); @@ -52,6 +81,9 @@ msg_thr(void *arg) xassert(job != NULL); + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); + pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL); + fd = job->jfd; while (1) { @@ -73,7 +105,7 @@ msg_thr(void *arg) msg->conn_fd = newfd; - _handle_msg(msg); + _handle_msg(job, msg); slurm_close_accepted_conn(newfd); diff --git a/src/srun/srun.c b/src/srun/srun.c index 4775dd54c8f69cd6751ac7c29d149e5b28552277..8a62ed8d900ae20a9a0464a09482f72134b2356a 100644 --- a/src/srun/srun.c +++ b/src/srun/srun.c @@ -33,12 +33,16 @@ typedef resource_allocation_response_msg_t allocation_resp; static allocation_resp * allocate_nodes(void); static void print_job_information(allocation_resp *resp); static void create_job_step(job_t *job); +static void sigterm_handler(int signum); +void * sig_thr(void *arg); int main(int ac, char **av) { + sigset_t sigset; allocation_resp *resp; job_t *job; + struct sigaction action; log_options_t logopt = LOG_OPTS_STDERR_ONLY; @@ -83,6 +87,15 @@ main(int ac, char **av) slurm_free_resource_allocation_response_msg(resp); } + /* block all signals in all threads, except sigterm */ + sigfillset(&sigset); + sigdelset(&sigset, SIGTERM); + if (sigprocmask(SIG_BLOCK, &sigset, NULL) != 0) + fatal("sigprocmask: %m"); + action.sa_handler = &sigterm_handler; + action.sa_flags = 0; + sigaction(SIGTERM, &action, NULL); + /* job structure should now be filled in */ if ((job->jfd = slurm_init_msg_engine_port(0)) == SLURM_SOCKET_ERROR) @@ -112,17 +125,31 @@ main(int ac, char **av) fatal("Unable to create message thread. %m\n"); debug("Started msg server thread (%d)\n", job->jtid); - /* launch jobs */ + /* spawn signal thread */ + if (pthread_create(&job->sigid, NULL, &sig_thr, (void *) job)) + fatal("Unable to create signals thread. %m"); + debug("Started signals thread (%d)", job->sigid); + /* launch jobs */ launch(job); - /* wait on and process signals */ - - sleep(120); + /* wait for job to terminate */ + while (job->state != SRUN_JOB_OVERDONE) { + pthread_cond_wait(&job->state_cond, &job->state_mutex); + debug("main thread woke up, state is now %d", job->state); + if (errno == EINTR) + debug("got signal"); + } + /* job is now overdone, blow this popsicle stand */ + if (!opt.no_alloc) slurm_complete_job(job->jobid); + pthread_kill(job->jtid, SIGTERM); + pthread_kill(job->ioid, SIGTERM); + pthread_kill(job->sigid, SIGTERM); + exit(0); } @@ -174,7 +201,8 @@ allocate_nodes(void) } -static void create_job_step(job_t *job) +static void +create_job_step(job_t *job) { job_step_create_request_msg_t req; job_step_create_response_msg_t *resp; @@ -212,3 +240,46 @@ print_job_information(allocation_resp *resp) } printf("\n"); } + +static void +sigterm_handler(int signum) +{ + if (signum == SIGTERM) { + debug2("thread %d canceled\n", pthread_self()); + pthread_exit(0); + } +} + + +/* simple signal handling thread */ +void * +sig_thr(void *arg) +{ + job_t *job = (job_t *)arg; + sigset_t set; + int signo; + struct sigaction action; + + + while (1) { + sigfillset(&set); + pthread_sigmask(SIG_UNBLOCK, &set, NULL); + sigwait(&set, &signo); + debug2("recvd signal %d", signo); + switch (signo) { + case SIGINT: + pthread_mutex_lock(&job->state_mutex); + job->state = SRUN_JOB_OVERDONE; + pthread_cond_signal(&job->state_cond); + pthread_mutex_unlock(&job->state_mutex); + pthread_exit(0); + break; + default: + /* fwd_signal(job, signo); */ + break; + } + } + + pthread_exit(0); +} +