From 85f339e3c60d6457d747c68910719c7fe5424712 Mon Sep 17 00:00:00 2001 From: Danny Auble <da@llnl.gov> Date: Mon, 8 Aug 2005 18:12:07 +0000 Subject: [PATCH] mods for ctrl-z to work --- src/common/getopt.c | 2 +- src/common/global_srun.c | 23 ++- src/srun/allocate.c | 10 +- src/srun/allocate.h | 2 + src/srun/attach.h | 4 +- src/srun/io.c | 14 +- src/srun/launch.c | 41 ++++- src/srun/msg.c | 314 ++++++++++++++++++++++++++++++++++----- src/srun/signals.c | 8 +- src/srun/srun.c | 5 +- src/srun/srun_job.c | 8 +- src/srun/srun_job.h | 23 ++- 12 files changed, 385 insertions(+), 69 deletions(-) diff --git a/src/common/getopt.c b/src/common/getopt.c index 9bafa453c62..942fede61d0 100644 --- a/src/common/getopt.c +++ b/src/common/getopt.c @@ -514,7 +514,7 @@ _getopt_internal (argc, argv, optstring, longopts, longind, long_only) int print_errors = opterr; if (optstring[0] == ':') print_errors = 0; - + if (argc < 1) return -1; diff --git a/src/common/global_srun.c b/src/common/global_srun.c index 5af987e38e9..5409c1dfa36 100644 --- a/src/common/global_srun.c +++ b/src/common/global_srun.c @@ -46,14 +46,6 @@ #include "src/common/xsignal.h" #include "src/common/global_srun.h" -/* - * Static list of signals to block in srun: - */ -static int srun_sigarray[] = { - SIGINT, SIGQUIT, SIGTSTP, SIGCONT, SIGTERM, - SIGALRM, SIGUSR1, SIGUSR2, SIGPIPE, 0 -}; - /* number of active threads */ static pthread_mutex_t active_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t active_cond = PTHREAD_COND_INITIALIZER; @@ -73,6 +65,7 @@ typedef struct task_info { int host_inx; } task_info_t; +int message_thread = 0; /* * Static prototypes @@ -87,13 +80,20 @@ fwd_signal(srun_job_t *job, int signo) slurm_msg_t *req; kill_tasks_msg_t msg; static pthread_mutex_t sig_mutex = PTHREAD_MUTEX_INITIALIZER; - + pipe_enum_t pipe_enum = PIPE_SIGNALED; + slurm_mutex_lock(&sig_mutex); if (signo == SIGKILL || signo == SIGINT || signo == SIGTERM) { slurm_mutex_lock(&job->state_mutex); job->signaled = true; slurm_mutex_unlock(&job->state_mutex); + if(message_thread) { + write(job->par_msg->msg_pipe[1], + &pipe_enum,sizeof(int)); + write(job->par_msg->msg_pipe[1], + &job->signaled,sizeof(int)); + } } debug2("forward signal %d to job", signo); @@ -110,7 +110,6 @@ fwd_signal(srun_job_t *job, int signo) debug2("%s has not yet replied\n", job->host[i]); continue; } - if (job_active_tasks_on_host(job, i) == 0) continue; @@ -166,7 +165,6 @@ static void _p_fwd_signal(slurm_msg_t *req, srun_job_t *job) tinfo->req_ptr = &req[i]; tinfo->job_ptr = job; tinfo->host_inx = i; - slurm_attr_init(&thd[i].attr); if (pthread_attr_setdetachstate(&thd[i].attr, PTHREAD_CREATE_DETACHED)) @@ -206,8 +204,9 @@ static void * _p_signal_task(void *args) * Report error unless it is "Invalid job id" which * probably just means the tasks exited in the meanwhile. */ - if ((rc != 0) && (rc != ESLURM_INVALID_JOB_ID) && (rc != ESRCH)) + if ((rc != 0) && (rc != ESLURM_INVALID_JOB_ID) && (rc != ESRCH)) { error("%s: signal: %s", host, slurm_strerror(rc)); + } done: slurm_mutex_lock(&active_mutex); diff --git a/src/srun/allocate.c b/src/srun/allocate.c index 7fc9b283fc0..22e4a9131a9 100644 --- a/src/srun/allocate.c +++ b/src/srun/allocate.c @@ -67,7 +67,7 @@ static job_step_create_request_msg_t * _step_req_create(srun_job_t *j); static void _step_req_destroy(job_step_create_request_msg_t *r); static sig_atomic_t destroy_job = 0; - +static srun_job_t *allocate_job = NULL; int allocate_test(void) @@ -187,7 +187,7 @@ _wait_for_resources(resource_allocation_response_msg_t **resp) if (destroy_job) { verbose("cancelling job %u", old.job_id); slurm_complete_job(old.job_id, 0, 0); - debugger_launch_failure(); + debugger_launch_failure(allocate_job); exit(0); } @@ -524,3 +524,9 @@ create_job_step(srun_job_t *job) return 0; } +void +set_allocate_job(srun_job_t *job) +{ + allocate_job = job; + return; +} diff --git a/src/srun/allocate.h b/src/srun/allocate.h index c5b9afbf65f..7411f7006a0 100644 --- a/src/srun/allocate.h +++ b/src/srun/allocate.h @@ -85,5 +85,7 @@ uint32_t jobid_from_env(void); */ int create_job_step(srun_job_t *j); +/* set the job for debugging purpose */ +void set_allocate_job(srun_job_t *job); #endif /* !_HAVE_ALLOCATE_H */ diff --git a/src/srun/attach.h b/src/srun/attach.h index dd093d04543..965f264b39b 100644 --- a/src/srun/attach.h +++ b/src/srun/attach.h @@ -29,7 +29,7 @@ #define VOLATILE #endif #endif - +#include "src/srun/srun_job.h" /***************************************************************************** * DEBUGGING SUPPORT * *****************************************************************************/ @@ -75,7 +75,7 @@ extern int MPIR_i_am_starter; extern int MPIR_acquired_pre_main; extern void MPIR_Breakpoint(void); -extern void debugger_launch_failure(void); +extern void debugger_launch_failure(srun_job_t *job); /* Value for totalview %J expansion in bulk launch string */ diff --git a/src/srun/io.c b/src/srun/io.c index 72c78b07cd5..e78f08e5122 100644 --- a/src/srun/io.c +++ b/src/srun/io.c @@ -155,9 +155,20 @@ _set_iofds_nonblocking(srun_job_t *job) static void _update_task_io_state(srun_job_t *job, int taskid) { + pipe_enum_t pipe_enum = PIPE_TASK_STATE; + slurm_mutex_lock(&job->task_mutex); - if (job->task_state[taskid] == SRUN_TASK_IO_WAIT) + if (job->task_state[taskid] == SRUN_TASK_IO_WAIT) { job->task_state[taskid] = SRUN_TASK_EXITED; + if(message_thread) { + write(job->par_msg->msg_pipe[1], + &pipe_enum,sizeof(int)); + write(job->par_msg->msg_pipe[1], + &taskid,sizeof(int)); + write(job->par_msg->msg_pipe[1], + &job->task_state[taskid],sizeof(int)); + } + } slurm_mutex_unlock(&job->task_mutex); } @@ -365,7 +376,6 @@ _setup_pollfds(srun_job_t *job, struct pollfd *fds, fd_info_t *map) pthread_exit(0); } } - return nfds; } diff --git a/src/srun/launch.c b/src/srun/launch.c index e25d62b08ab..b466bca8fb3 100644 --- a/src/srun/launch.c +++ b/src/srun/launch.c @@ -353,11 +353,35 @@ static void _update_failed_node(srun_job_t *j, int id) { int i; + pipe_enum_t pipe_enum = PIPE_HOST_STATE; + pthread_mutex_lock(&j->task_mutex); - if (j->host_state[id] == SRUN_HOST_INIT) + if (j->host_state[id] == SRUN_HOST_INIT) { j->host_state[id] = SRUN_HOST_UNREACHABLE; - for (i = 0; i < j->ntask[id]; i++) + + if(message_thread) { + write(j->par_msg->msg_pipe[1], + &pipe_enum,sizeof(int)); + write(j->par_msg->msg_pipe[1], + &id,sizeof(int)); + write(j->par_msg->msg_pipe[1], + &j->host_state[id],sizeof(int)); + } + } + + pipe_enum = PIPE_TASK_STATE; + for (i = 0; i < j->ntask[id]; i++) { j->task_state[j->tids[id][i]] = SRUN_TASK_FAILED; + + if(message_thread) { + write(j->par_msg->msg_pipe[1], + &pipe_enum,sizeof(int)); + write(j->par_msg->msg_pipe[1], + &j->tids[id][i],sizeof(int)); + write(j->par_msg->msg_pipe[1], + &j->task_state[j->tids[id][i]],sizeof(int)); + } + } pthread_mutex_unlock(&j->task_mutex); /* update_failed_tasks(j, id); */ @@ -366,9 +390,20 @@ _update_failed_node(srun_job_t *j, int id) static void _update_contacted_node(srun_job_t *j, int id) { + pipe_enum_t pipe_enum = PIPE_HOST_STATE; + pthread_mutex_lock(&j->task_mutex); - if (j->host_state[id] == SRUN_HOST_INIT) + if (j->host_state[id] == SRUN_HOST_INIT) { j->host_state[id] = SRUN_HOST_CONTACTED; + if(message_thread) { + write(j->par_msg->msg_pipe[1], + &pipe_enum,sizeof(int)); + write(j->par_msg->msg_pipe[1], + &id,sizeof(int)); + write(j->par_msg->msg_pipe[1], + &j->host_state[id],sizeof(int)); + } + } pthread_mutex_unlock(&j->task_mutex); } diff --git a/src/srun/msg.c b/src/srun/msg.c index 9d091a16f62..d62b23135cf 100644 --- a/src/srun/msg.c +++ b/src/srun/msg.c @@ -102,7 +102,6 @@ static void _node_fail_handler(char *nodelist, srun_job_t *job); #define _poll_wr_isset(pfd) ((pfd).revents & POLLOUT) #define _poll_err(pfd) ((pfd).revents & POLLERR) - /* * Install entry in the MPI_proctable for host with node id `nodeid' * and the number of tasks `ntasks' with pid array `pid' @@ -112,29 +111,64 @@ _build_proctable(srun_job_t *job, char *host, int nodeid, int ntasks, uint32_t * { int i; static int tasks_recorded = 0; - + pipe_enum_t pipe_enum = PIPE_MPIR_PROCTABLE_SIZE; + if (MPIR_proctable_size == 0) { MPIR_proctable_size = opt.nprocs; - MPIR_proctable = xmalloc(sizeof(MPIR_PROCDESC) * opt.nprocs); - totalview_jobid = NULL; - xstrfmtcat(totalview_jobid, "%lu", job->jobid); +/* MPIR_proctable = xmalloc(sizeof(MPIR_PROCDESC) * opt.nprocs); */ +/* totalview_jobid = NULL; */ +/* xstrfmtcat(totalview_jobid, "%lu", job->jobid); */ + + if(message_thread) { + write(job->par_msg->msg_pipe[1], + &pipe_enum,sizeof(int)); + write(job->par_msg->msg_pipe[1], + &opt.nprocs,sizeof(int)); + + pipe_enum = PIPE_MPIR_TOTALVIEW_JOBID; + write(job->par_msg->msg_pipe[1], + &pipe_enum,sizeof(int)); + write(job->par_msg->msg_pipe[1], + &job->jobid,sizeof(int)); + } } for (i = 0; i < ntasks; i++) { int taskid = job->tids[nodeid][i]; - MPIR_PROCDESC *tv = &MPIR_proctable[taskid]; - tv->host_name = job->host[nodeid]; - tv->executable_name = remote_argv[0]; - tv->pid = pid[i]; + /* MPIR_PROCDESC *tv = &MPIR_proctable[taskid]; */ +/* tv->host_name = job->host[nodeid]; */ +/* tv->executable_name = remote_argv[0]; */ +/* tv->pid = pid[i]; */ + + if(message_thread) { + pipe_enum = PIPE_MPIR_PROCDESC; + write(job->par_msg->msg_pipe[1], + &pipe_enum,sizeof(int)); + write(job->par_msg->msg_pipe[1], + &taskid,sizeof(int)); + write(job->par_msg->msg_pipe[1], + &nodeid,sizeof(int)); + write(job->par_msg->msg_pipe[1], + &pid[i],sizeof(int)); + } tasks_recorded++; } if (tasks_recorded == opt.nprocs) { - MPIR_debug_state = MPIR_DEBUG_SPAWNED; - MPIR_Breakpoint(); - if (opt.debugger_test) - _dump_proctable(job); + /* MPIR_debug_state = MPIR_DEBUG_SPAWNED; */ +/* MPIR_Breakpoint(); */ +/* if (opt.debugger_test) */ +/* _dump_proctable(job); */ + + if(message_thread) { + i = MPIR_DEBUG_SPAWNED; + pipe_enum = PIPE_MPIR_DEBUG_STATE; + write(job->par_msg->msg_pipe[1], + &pipe_enum,sizeof(int)); + write(job->par_msg->msg_pipe[1], + &i,sizeof(int)); + } } } @@ -160,11 +194,24 @@ static void _dump_proctable(srun_job_t *job) } } -void debugger_launch_failure(void) +void debugger_launch_failure(srun_job_t *job) { + int i; + pipe_enum_t pipe_enum = PIPE_MPIR_DEBUG_STATE; + if (opt.parallel_debug) { - MPIR_debug_state = MPIR_DEBUG_ABORTING; - MPIR_Breakpoint(); + /* MPIR_debug_state = MPIR_DEBUG_ABORTING; */ +/* MPIR_Breakpoint(); */ + if(message_thread && job) { + i = MPIR_DEBUG_ABORTING; + write(job->par_msg->msg_pipe[1], + &pipe_enum,sizeof(int)); + write(job->par_msg->msg_pipe[1], + &i,sizeof(int)); + } else if(!job) { + error("Hey I don't have a job to write to on the " + "failure of the debugger launch."); + } } } @@ -218,6 +265,8 @@ static bool _job_msg_done(srun_job_t *job) static void _process_launch_resp(srun_job_t *job, launch_tasks_response_msg_t *msg) { + pipe_enum_t pipe_enum = PIPE_HOST_STATE; + if ((msg->srun_node_id < 0) || (msg->srun_node_id >= job->nhosts)) { error ("Bad launch response from %s", msg->node_name); return; @@ -226,23 +275,40 @@ _process_launch_resp(srun_job_t *job, launch_tasks_response_msg_t *msg) pthread_mutex_lock(&job->task_mutex); job->host_state[msg->srun_node_id] = SRUN_HOST_REPLIED; pthread_mutex_unlock(&job->task_mutex); + + if(message_thread) { + write(job->par_msg->msg_pipe[1],&pipe_enum,sizeof(int)); + write(job->par_msg->msg_pipe[1], + &msg->srun_node_id,sizeof(int)); + write(job->par_msg->msg_pipe[1], + &job->host_state[msg->srun_node_id],sizeof(int)); + + } _build_proctable( job, msg->node_name, msg->srun_node_id, msg->count_of_pids, msg->local_pids ); _print_pid_list( msg->node_name, msg->count_of_pids, - msg->local_pids, remote_argv[0] ); - + msg->local_pids, remote_argv[0] ); + } static void update_running_tasks(srun_job_t *job, uint32_t nodeid) { int i; + pipe_enum_t pipe_enum = PIPE_TASK_STATE; debug2("updating %d running tasks for node %d", job->ntask[nodeid], nodeid); slurm_mutex_lock(&job->task_mutex); for (i = 0; i < job->ntask[nodeid]; i++) { uint32_t tid = job->tids[nodeid][i]; job->task_state[tid] = SRUN_TASK_RUNNING; + + if(message_thread) { + write(job->par_msg->msg_pipe[1],&pipe_enum,sizeof(int)); + write(job->par_msg->msg_pipe[1],&tid,sizeof(int)); + write(job->par_msg->msg_pipe[1], + &job->task_state[tid],sizeof(int)); + } } slurm_mutex_unlock(&job->task_mutex); } @@ -251,10 +317,20 @@ static void update_failed_tasks(srun_job_t *job, uint32_t nodeid) { int i; + pipe_enum_t pipe_enum = PIPE_TASK_STATE; + slurm_mutex_lock(&job->task_mutex); for (i = 0; i < job->ntask[nodeid]; i++) { uint32_t tid = job->tids[nodeid][i]; job->task_state[tid] = SRUN_TASK_FAILED; + + if(message_thread) { + write(job->par_msg->msg_pipe[1], + &pipe_enum,sizeof(int)); + write(job->par_msg->msg_pipe[1],&tid,sizeof(int)); + write(job->par_msg->msg_pipe[1], + &job->task_state[tid],sizeof(int)); + } tasks_exited++; } slurm_mutex_unlock(&job->task_mutex); @@ -269,7 +345,8 @@ static void _launch_handler(srun_job_t *job, slurm_msg_t *resp) { launch_tasks_response_msg_t *msg = resp->data; - + pipe_enum_t pipe_enum = PIPE_HOST_STATE; + debug2("received launch resp from %s nodeid=%d", msg->node_name, msg->srun_node_id); @@ -281,7 +358,15 @@ _launch_handler(srun_job_t *job, slurm_msg_t *resp) slurm_mutex_lock(&job->task_mutex); job->host_state[msg->srun_node_id] = SRUN_HOST_REPLIED; slurm_mutex_unlock(&job->task_mutex); - + + if(message_thread) { + write(job->par_msg->msg_pipe[1], + &pipe_enum,sizeof(int)); + write(job->par_msg->msg_pipe[1], + &msg->srun_node_id,sizeof(int)); + write(job->par_msg->msg_pipe[1], + &job->host_state[msg->srun_node_id],sizeof(int)); + } update_failed_tasks(job, msg->srun_node_id); /* @@ -291,7 +376,7 @@ _launch_handler(srun_job_t *job, slurm_msg_t *resp) } else update_failed_tasks(job, msg->srun_node_id); */ - debugger_launch_failure(); + debugger_launch_failure(job); return; } else { _process_launch_resp(job, msg); @@ -330,7 +415,8 @@ _reattach_handler(srun_job_t *job, slurm_msg_t *msg) { int i; reattach_tasks_response_msg_t *resp = msg->data; - + pipe_enum_t pipe_enum = PIPE_HOST_STATE; + if ((resp->srun_node_id < 0) || (resp->srun_node_id >= job->nhosts)) { error ("Invalid reattach response received"); return; @@ -340,6 +426,14 @@ _reattach_handler(srun_job_t *job, slurm_msg_t *msg) job->host_state[resp->srun_node_id] = SRUN_HOST_REPLIED; slurm_mutex_unlock(&job->task_mutex); + if(message_thread) { + write(job->par_msg->msg_pipe[1],&pipe_enum,sizeof(int)); + write(job->par_msg->msg_pipe[1], + &resp->srun_node_id,sizeof(int)); + write(job->par_msg->msg_pipe[1], + &job->host_state[resp->srun_node_id],sizeof(int)); + } + if (resp->return_code != 0) { if (job->stepid == NO_VAL) { error ("Unable to attach to job %d: %s", @@ -394,13 +488,13 @@ _print_exit_status(srun_job_t *job, hostlist_t hl, char *host, int status) char *corestr = ""; bool signaled = false; void (*print) (const char *, ...) = (void *) &error; - + xassert(hl != NULL); slurm_mutex_lock(&job->state_mutex); signaled = job->signaled; slurm_mutex_unlock(&job->state_mutex); - + /* * Print message that task was signaled as verbose message * not error message if the user generated the signal. @@ -480,6 +574,7 @@ _exit_handler(srun_job_t *job, slurm_msg_t *exit_msg) else job->task_state[taskid] = SRUN_TASK_EXITED; } + slurm_mutex_unlock(&job->task_mutex); tasks_exited++; @@ -735,7 +830,6 @@ _msg_thr_poll(srun_job_t *job) _poll_set_rd(fds[i], slurmctld_fd); while (!_job_msg_done(job)) { - if (_do_poll(job, fds, _get_next_timeout(job)) == 0) { _do_poll_timeout(job); continue; @@ -750,6 +844,7 @@ _msg_thr_poll(srun_job_t *job) else if (revents & POLLIN) _accept_msg_connection(job, i); } + } xfree(fds); /* if we were to break out of while loop */ } @@ -758,13 +853,111 @@ void * msg_thr(void *arg) { srun_job_t *job = (srun_job_t *) arg; - + par_to_msg_t *par_msg = job->par_msg; + int done = 0; debug3("msg thread pid = %lu", (unsigned long) getpid()); slurm_uid = (uid_t) slurm_get_slurm_user_id(); _msg_thr_poll(job); + close(par_msg->msg_pipe[1]); // close excess fildes + debug3("msg thread done"); + return (void *)1; +} + +void * +par_thr(void *arg) +{ + srun_job_t *job = (srun_job_t *) arg; + par_to_msg_t *par_msg = job->par_msg; + par_to_msg_t *msg_par = job->msg_par; + int c; + pipe_enum_t type=0; + int tid=-1; + int nodeid=-1; + int status; + debug3("par thread pid = %lu", (unsigned long) getpid()); + + //slurm_uid = (uid_t) slurm_get_slurm_user_id(); + close(msg_par->msg_pipe[0]); // close read end of pipe + close(par_msg->msg_pipe[1]); // close write end of pipe + while(read(par_msg->msg_pipe[0],&c,sizeof(int))>0) { + // getting info from msg thread + if(type == PIPE_NONE) { + debug2("got type %d\n",c); + type = c; + continue; + } + + if(type == PIPE_JOB_STATE) { + update_job_state(job, c); + } else if(type == PIPE_TASK_STATE) { + if(tid == -1) { + tid = c; + continue; + } + slurm_mutex_lock(&job->task_mutex); + job->task_state[tid] = c; + if(c == SRUN_TASK_FAILED) + tasks_exited++; + slurm_mutex_unlock(&job->task_mutex); + if (tasks_exited == opt.nprocs) { + debug2("all tasks exited"); + update_job_state(job, SRUN_JOB_TERMINATED); + } + tid = -1; + } else if(type == PIPE_HOST_STATE) { + if(tid == -1) { + tid = c; + continue; + } + slurm_mutex_lock(&job->task_mutex); + job->host_state[tid] = c; + slurm_mutex_unlock(&job->task_mutex); + tid = -1; + } else if(type == PIPE_SIGNALED) { + slurm_mutex_lock(&job->state_mutex); + job->signaled = c; + slurm_mutex_unlock(&job->state_mutex); + } else if(type == PIPE_MPIR_PROCTABLE_SIZE) { + if(MPIR_proctable_size == 0) { + MPIR_proctable_size = c; + MPIR_proctable = + xmalloc(sizeof(MPIR_PROCDESC) * c); + } + } else if(type == PIPE_MPIR_TOTALVIEW_JOBID) { + totalview_jobid = NULL; + xstrfmtcat(totalview_jobid, "%lu", c); + } else if(type == PIPE_MPIR_PROCDESC) { + if(tid == -1) { + tid = c; + continue; + } + if(nodeid == -1) { + nodeid = c; + continue; + } + MPIR_PROCDESC *tv = &MPIR_proctable[tid]; + tv->host_name = job->host[nodeid]; + tv->executable_name = remote_argv[0]; + tv->pid = c; + tid = -1; + nodeid = -1; + } else if(type == PIPE_MPIR_DEBUG_STATE) { + MPIR_debug_state = c; + MPIR_Breakpoint(); + if (opt.debugger_test) + _dump_proctable(job); + } + type = PIPE_NONE; + + } + close(par_msg->msg_pipe[0]); // close excess fildes + close(msg_par->msg_pipe[1]); // close excess fildes + if(waitpid(par_msg->pid,&status,0)<0) // wait for pid to finish + return;// there was an error + debug3("par thread done"); return (void *)1; } @@ -773,27 +966,72 @@ msg_thr_create(srun_job_t *job) { int i; pthread_attr_t attr; + int c; + job->par_msg = xmalloc(sizeof(par_to_msg_t)); + job->msg_par = xmalloc(sizeof(par_to_msg_t)); + par_to_msg_t *par_msg = job->par_msg; + par_to_msg_t *msg_par = job->msg_par; + + set_allocate_job(job); for (i = 0; i < job->njfds; i++) { if ((job->jfd[i] = slurm_init_msg_engine_port(0)) < 0) fatal("init_msg_engine_port: %m"); - if (slurm_get_stream_addr(job->jfd[i], &job->jaddr[i]) < 0) + if (slurm_get_stream_addr(job->jfd[i], &job->jaddr[i]) + < 0) fatal("slurm_get_stream_addr: %m"); debug("initialized job control port %d\n", - ntohs(((struct sockaddr_in)job->jaddr[i]).sin_port)); + ntohs(((struct sockaddr_in) + job->jaddr[i]).sin_port)); } - slurm_attr_init(&attr); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); - if ((errno = pthread_create(&job->jtid, &attr, &msg_thr, - (void *)job))) - fatal("Unable to start message thread: %m"); - - debug("Started msg server thread (%lu)", (unsigned long) job->jtid); + if (pipe(par_msg->msg_pipe) == -1) + return SLURM_ERROR; // there was an error + if (pipe(msg_par->msg_pipe) == -1) + return SLURM_ERROR; // there was an error + debug2("created the pipes for communication"); + if((par_msg->pid = fork()) == -1) + return SLURM_ERROR; // there was an error + else if (par_msg->pid == 0) + { // child: + setsid(); + message_thread = 1; + close(par_msg->msg_pipe[0]); // close read end of pipe + close(msg_par->msg_pipe[1]); // close write end of pipe + slurm_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + if ((errno = pthread_create(&job->jtid, &attr, &msg_thr, + (void *)job))) + fatal("Unable to start msg to parent thread: %m"); + + debug("Started msg to parent server thread (%lu)", + (unsigned long) job->jtid); + + while(read(msg_par->msg_pipe[0],&c,sizeof(int))>0) + ; // make sure my parent doesn't leave me hangin + + close(msg_par->msg_pipe[0]); // close excess fildes + xfree(par_msg); + xfree(msg_par); + _exit(0); + } + else + { // parent: + + slurm_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + if ((errno = pthread_create(&job->jtid, &attr, &par_thr, + (void *)job))) + fatal("Unable to start parent to msg thread: %m"); + + debug("Started parent to msg server thread (%lu)", + (unsigned long) job->jtid); + } + return SLURM_SUCCESS; } - + static void _print_pid_list(const char *host, int ntasks, uint32_t *pid, char *executable_name) @@ -802,12 +1040,12 @@ _print_pid_list(const char *host, int ntasks, uint32_t *pid, int i; hostlist_t pids = hostlist_create(NULL); char buf[1024]; - + for (i = 0; i < ntasks; i++) { snprintf(buf, sizeof(buf), "pids:%d", pid[i]); hostlist_push(pids, buf); } - + hostlist_ranged_string(pids, sizeof(buf), buf); verbose("%s: %s %s", host, executable_name, buf); } diff --git a/src/srun/signals.c b/src/srun/signals.c index dc092ac7a33..77f31758dad 100644 --- a/src/srun/signals.c +++ b/src/srun/signals.c @@ -51,7 +51,7 @@ * Static list of signals to block in srun: */ static int srun_sigarray[] = { - SIGINT, SIGQUIT, SIGTSTP, SIGCONT, SIGTERM, + SIGINT, SIGQUIT, /*SIGTSTP,*/ SIGCONT, SIGTERM, SIGALRM, SIGUSR1, SIGUSR2, SIGPIPE, 0 }; @@ -190,9 +190,9 @@ _sig_thr(void *arg) case SIGINT: _handle_intr(job, &last_intr, &last_intr_sent); break; - case SIGTSTP: - debug3("got SIGTSTP"); - break; + /* case SIGTSTP: */ +/* debug3("got SIGTSTP"); */ +/* break; */ case SIGCONT: debug3("got SIGCONT"); break; diff --git a/src/srun/srun.c b/src/srun/srun.c index 9ffebab1229..73f5f9c8241 100644 --- a/src/srun/srun.c +++ b/src/srun/srun.c @@ -273,10 +273,11 @@ int srun(int ac, char **av) /* wait for job to terminate */ slurm_mutex_lock(&job->state_mutex); - while (job->state < SRUN_JOB_TERMINATED) + while (job->state < SRUN_JOB_TERMINATED) { pthread_cond_wait(&job->state_cond, &job->state_mutex); + } slurm_mutex_unlock(&job->state_mutex); - + /* job is now overdone, clean up * * If job is "forcefully terminated" exit immediately. diff --git a/src/srun/srun_job.c b/src/srun/srun_job.c index 11ab98b7f3a..db791f80a0b 100644 --- a/src/srun/srun_job.c +++ b/src/srun/srun_job.c @@ -252,10 +252,16 @@ job_create_noalloc(void) void update_job_state(srun_job_t *job, srun_job_state_t state) { + pipe_enum_t pipe_enum = PIPE_JOB_STATE; pthread_mutex_lock(&job->state_mutex); if (job->state < state) { job->state = state; + if(message_thread) { + write(job->par_msg->msg_pipe[1],&pipe_enum,sizeof(int)); + write(job->par_msg->msg_pipe[1],&job->state,sizeof(int)); + } pthread_cond_signal(&job->state_cond); + } pthread_mutex_unlock(&job->state_mutex); } @@ -345,7 +351,7 @@ srun_job_destroy(srun_job_t *job, int error) return; } - if (error) debugger_launch_failure(); + if (error) debugger_launch_failure(job); job->removed = true; } diff --git a/src/srun/srun_job.h b/src/srun/srun_job.h index e00510f924e..3bfe25a647f 100644 --- a/src/srun/srun_job.h +++ b/src/srun/srun_job.h @@ -44,6 +44,18 @@ #include "src/srun/signals.h" #include "src/srun/fname.h" +typedef enum { + PIPE_NONE = 0, + PIPE_JOB_STATE, + PIPE_TASK_STATE, + PIPE_HOST_STATE, + PIPE_SIGNALED, + PIPE_MPIR_PROCTABLE_SIZE, + PIPE_MPIR_TOTALVIEW_JOBID, + PIPE_MPIR_PROCDESC, + PIPE_MPIR_DEBUG_STATE +} pipe_enum_t; + typedef enum { SRUN_JOB_INIT = 0, /* Job's initial state */ SRUN_JOB_LAUNCHING, /* Launch thread is running */ @@ -55,7 +67,7 @@ typedef enum { SRUN_JOB_DONE, /* tasks and IO complete */ SRUN_JOB_DETACHED, /* Detached IO from job (Not used now) */ SRUN_JOB_FAILED, /* Job failed for some reason */ - SRUN_JOB_FORCETERM, /* Forced termination of IO thread */ + SRUN_JOB_FORCETERM /* Forced termination of IO thread */ } srun_job_state_t; typedef enum { @@ -74,6 +86,10 @@ typedef enum { SRUN_TASK_ABNORMAL_EXIT } srun_task_state_t; +typedef struct par_to_msg { + int msg_pipe[2]; + int pid; +} par_to_msg_t; typedef struct srun_job { uint32_t jobid; /* assigned job id */ @@ -143,10 +159,13 @@ typedef struct srun_job { FILE *errstream; int stdinfd; bool *stdin_eof; /* true if task i processed stdin eof */ - + par_to_msg_t *par_msg; + par_to_msg_t *msg_par; select_jobinfo_t select_jobinfo; } srun_job_t; +extern int message_thread; + void update_job_state(srun_job_t *job, srun_job_state_t newstate); void job_force_termination(srun_job_t *job); -- GitLab