diff --git a/src/srun/msg.c b/src/srun/msg.c index caa968b39bfe375b182627eeea4978ab25602739..1b9a668309233d7200cf1cd53104ef7420d1ac04 100644 --- a/src/srun/msg.c +++ b/src/srun/msg.c @@ -122,62 +122,103 @@ static void _node_fail_handler(char *nodelist, srun_job_t *job); } \ } while (0) -/* - * Install entry in the MPI_proctable for host with node id `nodeid' - * and the number of tasks `ntasks' with pid array `pid' - */ -static void -_build_proctable(srun_job_t *job, char *host, int nodeid, int ntasks, uint32_t *pid) + +/* fd is job->forked_msg->par_msg->msg_pipe[1] */ +static void _update_mpir_proctable(int fd, srun_job_t *job, + int nodeid, int ntasks, uint32_t *pid, + char *executable) { + int msg_type = PIPE_UPDATE_MPIR_PROCTABLE; + int dummy = 0xdeadbeef; + int len; int i; + + xassert(message_thread); + safe_write(fd, &msg_type, sizeof(int)); /* read by par_thr() */ + safe_write(fd, &dummy, sizeof(int)); /* read by par_thr() */ + + /* the rest are read by _handle_update_mpir_proctable() */ + safe_write(fd, &nodeid, sizeof(int)); + safe_write(fd, &ntasks, sizeof(int)); + len = strlen(executable) + 1; + safe_write(fd, &len, sizeof(int)); + if (len > 0) { + safe_write(fd, executable, len); + } + for (i = 0; i < ntasks; i++) { + int taskid = job->step_layout->tids[nodeid][i]; + safe_write(fd, &taskid, sizeof(int)); + safe_write(fd, &pid[i], sizeof(int)); + } + + return; + +rwfail: + error("write to srun main process failed"); + return; +} + +static void _handle_update_mpir_proctable(int fd, srun_job_t *job) +{ static int tasks_recorded = 0; - pipe_enum_t pipe_enum = PIPE_MPIR_PROCTABLE_SIZE; - + int nodeid; + int ntasks; + int len; + char *executable = NULL; + int i; + + /* some initialization */ if (MPIR_proctable_size == 0) { - MPIR_proctable_size = opt.nprocs; - - if(message_thread) { - write(job->forked_msg->par_msg->msg_pipe[1], - &pipe_enum,sizeof(int)); - write(job->forked_msg->par_msg->msg_pipe[1], - &opt.nprocs,sizeof(int)); - - pipe_enum = PIPE_MPIR_TOTALVIEW_JOBID; - write(job->forked_msg->par_msg->msg_pipe[1], - &pipe_enum,sizeof(int)); - write(job->forked_msg->par_msg->msg_pipe[1], - &job->jobid,sizeof(int)); - } + MPIR_proctable_size = job->step_layout->num_tasks; + MPIR_proctable = xmalloc(sizeof(MPIR_PROCDESC) + * MPIR_proctable_size); + totalview_jobid = NULL; + xstrfmtcat(totalview_jobid, "%u", job->jobid); } - for (i = 0; i < ntasks; i++) { - int taskid = job->step_layout->tids[nodeid][i]; - - if(message_thread) { - pipe_enum = PIPE_MPIR_PROCDESC; - write(job->forked_msg->par_msg->msg_pipe[1], - &pipe_enum,sizeof(int)); - write(job->forked_msg->par_msg->msg_pipe[1], - &taskid,sizeof(int)); - write(job->forked_msg->par_msg->msg_pipe[1], - &nodeid,sizeof(int)); - write(job->forked_msg->par_msg->msg_pipe[1], - &pid[i],sizeof(int)); + safe_read(fd, &nodeid, sizeof(int)); + safe_read(fd, &ntasks, sizeof(int)); + safe_read(fd, &len, sizeof(int)); + if (len > 0) { + executable = xmalloc(len); + safe_read(fd, executable, len); + + /* remote_argv global will be NULL during an srun --attach */ + if (remote_argv == NULL) { + remote_argc = 1; + xrealloc(remote_argv, 2 * sizeof(char *)); + remote_argv[0] = executable; + remote_argv[1] = NULL; } + } + for (i = 0; i < ntasks; i++) { + MPIR_PROCDESC *tv; + int taskid, pid; + + safe_read(fd, &taskid, sizeof(int)); + safe_read(fd, &pid, sizeof(int)); + tv = &MPIR_proctable[taskid]; + tv->host_name = job->step_layout->host[nodeid]; + tv->pid = pid; + tv->executable_name = executable; tasks_recorded++; } - if (tasks_recorded == opt.nprocs) { - if(message_thread) { - i = MPIR_DEBUG_SPAWNED; - pipe_enum = PIPE_MPIR_DEBUG_STATE; - write(job->forked_msg->par_msg->msg_pipe[1], - &pipe_enum,sizeof(int)); - write(job->forked_msg->par_msg->msg_pipe[1], - &i,sizeof(int)); - } + /* if all tasks are now accounted for, set the debug state and + call the Breakpoint */ + if (tasks_recorded == job->step_layout->num_tasks) { + MPIR_debug_state = MPIR_DEBUG_SPAWNED; + MPIR_Breakpoint(); + if (opt.debugger_test) + _dump_proctable(job); } + + return; + +rwfail: + error("read from srun message-handler process failed"); + return; } static void _update_step_layout(int fd, slurm_step_layout_t *layout, int nodeid) @@ -248,16 +289,19 @@ rwfail: static void _dump_proctable(srun_job_t *job) { int node_inx, task_inx, taskid; + int num_tasks; MPIR_PROCDESC *tv; for (node_inx=0; node_inx<job->nhosts; node_inx++) { - for (task_inx=0; task_inx<job->step_layout->tasks[node_inx]; task_inx++) { + num_tasks = job->step_layout->tasks[node_inx]; + for (task_inx = 0; task_inx < num_tasks; task_inx++) { taskid = job->step_layout->tids[node_inx][task_inx]; tv = &MPIR_proctable[taskid]; if (!tv) break; - info("task:%d, host:%s, pid:%d", - taskid, tv->host_name, tv->pid); + info("task:%d, host:%s, pid:%d, executable:%s", + taskid, tv->host_name, tv->pid, + tv->executable_name); } } } @@ -342,16 +386,17 @@ _process_launch_resp(srun_job_t *job, launch_tasks_response_msg_t *msg) pthread_mutex_unlock(&job->task_mutex); if(message_thread) { - write(job->forked_msg-> - par_msg->msg_pipe[1],&pipe_enum,sizeof(int)); write(job->forked_msg->par_msg->msg_pipe[1], - &msg->srun_node_id,sizeof(int)); + &pipe_enum, sizeof(int)); write(job->forked_msg->par_msg->msg_pipe[1], - &job->host_state[msg->srun_node_id],sizeof(int)); + &msg->srun_node_id, sizeof(int)); + write(job->forked_msg->par_msg->msg_pipe[1], + &job->host_state[msg->srun_node_id], sizeof(int)); } - _build_proctable( job, msg->node_name, msg->srun_node_id, - msg->count_of_pids, msg->local_pids ); + _update_mpir_proctable(job->forked_msg->par_msg->msg_pipe[1], job, + msg->srun_node_id, msg->count_of_pids, + msg->local_pids, remote_argv[0]); _print_pid_list( msg->node_name, msg->count_of_pids, msg->local_pids, remote_argv[0] ); @@ -369,10 +414,10 @@ update_tasks_state(srun_job_t *job, uint32_t nodeid) uint32_t tid = job->step_layout->tids[nodeid][i]; if(message_thread) { - write(job->forked_msg-> - par_msg->msg_pipe[1],&pipe_enum,sizeof(int)); - write(job->forked_msg-> - par_msg->msg_pipe[1],&tid,sizeof(int)); + write(job->forked_msg->par_msg->msg_pipe[1], + &pipe_enum,sizeof(int)); + write(job->forked_msg->par_msg->msg_pipe[1], + &tid,sizeof(int)); write(job->forked_msg->par_msg->msg_pipe[1], &job->task_state[tid],sizeof(int)); } @@ -560,8 +605,6 @@ _reattach_handler(srun_job_t *job, slurm_msg_t *msg) job->step_layout, resp->srun_node_id); /* Build process table for any parallel debugger - * FIXME - does remote_arg* need to be updated - * in the main srun process? */ if ((remote_argc == 0) && (resp->executable_name)) { remote_argc = 1; @@ -570,11 +613,12 @@ _reattach_handler(srun_job_t *job, slurm_msg_t *msg) resp->executable_name = NULL; /* nothing left to free */ remote_argv[1] = NULL; } - _build_proctable (job, resp->node_name, resp->srun_node_id, - resp->ntasks, resp->local_pids); + _update_mpir_proctable(job->forked_msg->par_msg->msg_pipe[1], job, + resp->srun_node_id, resp->ntasks, + resp->local_pids, remote_argv[0]); _print_pid_list(resp->node_name, resp->ntasks, resp->local_pids, - resp->executable_name); + remote_argv[0]); update_running_tasks(job, resp->srun_node_id); @@ -1072,42 +1116,16 @@ par_thr(void *arg) job->signaled = c; slurm_mutex_unlock(&job->state_mutex); break; - case PIPE_MPIR_PROCTABLE_SIZE: - if(MPIR_proctable_size == 0) { - MPIR_proctable_size = c; - MPIR_proctable = - xmalloc(sizeof(MPIR_PROCDESC) * c); - } - break; - case PIPE_MPIR_TOTALVIEW_JOBID: - totalview_jobid = NULL; - xstrfmtcat(totalview_jobid, "%lu", c); - break; - case PIPE_MPIR_PROCDESC: - if(tid == -1) { - tid = c; - continue; - } - if(nodeid == -1) { - nodeid = c; - continue; - } - { - MPIR_PROCDESC *tv = &MPIR_proctable[tid]; - tv->host_name = job->step_layout->host[nodeid]; - debug("tv->host_name = %s", tv->host_name); - tv->executable_name = remote_argv[0]; - tv->pid = c; - tid = -1; - nodeid = -1; - } - break; case PIPE_MPIR_DEBUG_STATE: MPIR_debug_state = c; MPIR_Breakpoint(); if (opt.debugger_test) _dump_proctable(job); break; + case PIPE_UPDATE_MPIR_PROCTABLE: + _handle_update_mpir_proctable(par_msg->msg_pipe[0], + job); + break; case PIPE_UPDATE_STEP_LAYOUT: _handle_update_step_layout(par_msg->msg_pipe[0], job->step_layout); diff --git a/src/srun/srun_job.h b/src/srun/srun_job.h index 151ff988e5107853dd2cfcb41c84d6f1e026bf37..9eb6e74ff6e04eeeb89056f03beadddf2d44bf2f 100644 --- a/src/srun/srun_job.h +++ b/src/srun/srun_job.h @@ -53,10 +53,8 @@ typedef enum { PIPE_TASK_EXITCODE, PIPE_HOST_STATE, PIPE_SIGNALED, - PIPE_MPIR_PROCTABLE_SIZE, - PIPE_MPIR_TOTALVIEW_JOBID, - PIPE_MPIR_PROCDESC, PIPE_MPIR_DEBUG_STATE, + PIPE_UPDATE_MPIR_PROCTABLE, PIPE_UPDATE_STEP_LAYOUT } pipe_enum_t;