From d146f3100b435182c011fc074dec05f0b0cd3ccf Mon Sep 17 00:00:00 2001 From: Danny Auble <da@llnl.gov> Date: Wed, 31 Aug 2005 16:41:33 +0000 Subject: [PATCH] mods for more of a global structure when talking to poe on the message thread also used in srun --- slurm/slurm.h.in | 12 +++++ src/common/env.c | 11 +---- src/common/global_srun.c | 4 +- src/slurmctld/srun_comm.c | 5 ++ src/srun/io.c | 6 +-- src/srun/launch.c | 18 +++---- src/srun/msg.c | 100 +++++++++++++++++++++----------------- src/srun/srun.c | 1 - src/srun/srun_job.c | 6 ++- src/srun/srun_job.h | 8 +-- 10 files changed, 92 insertions(+), 79 deletions(-) diff --git a/slurm/slurm.h.in b/slurm/slurm.h.in index f6ccd528a2b..909b44c6cfc 100644 --- a/slurm/slurm.h.in +++ b/slurm/slurm.h.in @@ -321,6 +321,18 @@ typedef struct job_descriptor { /* For submit, allocate, and update requests */ * SLURM internal use only */ } job_desc_msg_t; +/* For Message thread */ +typedef struct forked_msg_pipe { + int msg_pipe[2]; + int pid; +} forked_msg_pipe_t; + +typedef struct forked_message { + forked_msg_pipe_t * par_msg; + forked_msg_pipe_t * msg_par; + enum job_states * job_state; +} forked_msg_t; + typedef struct job_info { uint32_t job_id; /* job ID */ char *name; /* name of the job */ diff --git a/src/common/env.c b/src/common/env.c index b5cb525fc23..e4041efe9de 100644 --- a/src/common/env.c +++ b/src/common/env.c @@ -374,16 +374,7 @@ int setup_env(env_t *env) debug_num = atoi(debug_env); snprintf(res_env, sizeof(res_env), "SLURM_LL_API_DEBUG=%d", debug_num); - if (env->jobid >= 0) { - snprintf(tmp_env, sizeof(tmp_env), "\nSLURM_JOBID=%d", - env->jobid); - strcat(res_env, tmp_env); - } - if (env->stepid >= 0) { - snprintf(tmp_env, sizeof(tmp_env), "\nSLURM_STEPID=%d", - env->stepid); - strcat(res_env, tmp_env); - } + setenvf(&env->env, "MP_POERESTART_ENV", res_env); /* Required for AIX/POE systems indicating pre-allocation */ diff --git a/src/common/global_srun.c b/src/common/global_srun.c index 5409c1dfa36..f9a86dce9f6 100644 --- a/src/common/global_srun.c +++ b/src/common/global_srun.c @@ -89,9 +89,9 @@ fwd_signal(srun_job_t *job, int signo) job->signaled = true; slurm_mutex_unlock(&job->state_mutex); if(message_thread) { - write(job->par_msg->msg_pipe[1], + write(job->forked_msg->par_msg->msg_pipe[1], &pipe_enum,sizeof(int)); - write(job->par_msg->msg_pipe[1], + write(job->forked_msg->par_msg->msg_pipe[1], &job->signaled,sizeof(int)); } } diff --git a/src/slurmctld/srun_comm.c b/src/slurmctld/srun_comm.c index df6248689a3..0745a989755 100644 --- a/src/slurmctld/srun_comm.c +++ b/src/slurmctld/srun_comm.c @@ -172,6 +172,7 @@ extern void srun_ping (void) job_iterator = list_iterator_create(job_list); while ((job_ptr = (struct job_record *) list_next(job_iterator))) { xassert (job_ptr->magic == JOB_MAGIC); + if (job_ptr->job_state != JOB_RUNNING) continue; if ( (job_ptr->time_last_active <= old) && job_ptr->port && @@ -194,6 +195,10 @@ extern void srun_ping (void) (step_ptr->batch_step) || (step_ptr->host[0] == '\0') ) continue; + debug3("sending message to host=%s, port=%u\n", + step_ptr->host, + step_ptr->port); + addr = xmalloc(sizeof(struct sockaddr_in)); slurm_set_addr(addr, step_ptr->port, step_ptr->host); msg_arg = xmalloc(sizeof(srun_ping_msg_t)); diff --git a/src/srun/io.c b/src/srun/io.c index e78f08e5122..08c1cb85218 100644 --- a/src/srun/io.c +++ b/src/srun/io.c @@ -161,11 +161,11 @@ _update_task_io_state(srun_job_t *job, int taskid) if (job->task_state[taskid] == SRUN_TASK_IO_WAIT) { job->task_state[taskid] = SRUN_TASK_EXITED; if(message_thread) { - write(job->par_msg->msg_pipe[1], + write(job->forked_msg->par_msg->msg_pipe[1], &pipe_enum,sizeof(int)); - write(job->par_msg->msg_pipe[1], + write(job->forked_msg->par_msg->msg_pipe[1], &taskid,sizeof(int)); - write(job->par_msg->msg_pipe[1], + write(job->forked_msg->par_msg->msg_pipe[1], &job->task_state[taskid],sizeof(int)); } } diff --git a/src/srun/launch.c b/src/srun/launch.c index c7b4f6016bb..99f55c8310e 100644 --- a/src/srun/launch.c +++ b/src/srun/launch.c @@ -368,11 +368,11 @@ _update_failed_node(srun_job_t *j, int id) j->host_state[id] = SRUN_HOST_UNREACHABLE; if(message_thread) { - write(j->par_msg->msg_pipe[1], + write(j->forked_msg->par_msg->msg_pipe[1], &pipe_enum,sizeof(int)); - write(j->par_msg->msg_pipe[1], + write(j->forked_msg->par_msg->msg_pipe[1], &id,sizeof(int)); - write(j->par_msg->msg_pipe[1], + write(j->forked_msg->par_msg->msg_pipe[1], &j->host_state[id],sizeof(int)); } } @@ -382,11 +382,11 @@ _update_failed_node(srun_job_t *j, int id) j->task_state[j->tids[id][i]] = SRUN_TASK_FAILED; if(message_thread) { - write(j->par_msg->msg_pipe[1], + write(j->forked_msg->par_msg->msg_pipe[1], &pipe_enum,sizeof(int)); - write(j->par_msg->msg_pipe[1], + write(j->forked_msg->par_msg->msg_pipe[1], &j->tids[id][i],sizeof(int)); - write(j->par_msg->msg_pipe[1], + write(j->forked_msg->par_msg->msg_pipe[1], &j->task_state[j->tids[id][i]],sizeof(int)); } } @@ -404,11 +404,11 @@ _update_contacted_node(srun_job_t *j, int id) if (j->host_state[id] == SRUN_HOST_INIT) { j->host_state[id] = SRUN_HOST_CONTACTED; if(message_thread) { - write(j->par_msg->msg_pipe[1], + write(j->forked_msg->par_msg->msg_pipe[1], &pipe_enum,sizeof(int)); - write(j->par_msg->msg_pipe[1], + write(j->forked_msg->par_msg->msg_pipe[1], &id,sizeof(int)); - write(j->par_msg->msg_pipe[1], + write(j->forked_msg->par_msg->msg_pipe[1], &j->host_state[id],sizeof(int)); } } diff --git a/src/srun/msg.c b/src/srun/msg.c index d62b23135cf..6fa5bdaa630 100644 --- a/src/srun/msg.c +++ b/src/srun/msg.c @@ -120,15 +120,15 @@ _build_proctable(srun_job_t *job, char *host, int nodeid, int ntasks, uint32_t * /* xstrfmtcat(totalview_jobid, "%lu", job->jobid); */ if(message_thread) { - write(job->par_msg->msg_pipe[1], + write(job->forked_msg->par_msg->msg_pipe[1], &pipe_enum,sizeof(int)); - write(job->par_msg->msg_pipe[1], + write(job->forked_msg->par_msg->msg_pipe[1], &opt.nprocs,sizeof(int)); pipe_enum = PIPE_MPIR_TOTALVIEW_JOBID; - write(job->par_msg->msg_pipe[1], + write(job->forked_msg->par_msg->msg_pipe[1], &pipe_enum,sizeof(int)); - write(job->par_msg->msg_pipe[1], + write(job->forked_msg->par_msg->msg_pipe[1], &job->jobid,sizeof(int)); } } @@ -142,13 +142,13 @@ _build_proctable(srun_job_t *job, char *host, int nodeid, int ntasks, uint32_t * if(message_thread) { pipe_enum = PIPE_MPIR_PROCDESC; - write(job->par_msg->msg_pipe[1], + write(job->forked_msg->par_msg->msg_pipe[1], &pipe_enum,sizeof(int)); - write(job->par_msg->msg_pipe[1], + write(job->forked_msg->par_msg->msg_pipe[1], &taskid,sizeof(int)); - write(job->par_msg->msg_pipe[1], + write(job->forked_msg->par_msg->msg_pipe[1], &nodeid,sizeof(int)); - write(job->par_msg->msg_pipe[1], + write(job->forked_msg->par_msg->msg_pipe[1], &pid[i],sizeof(int)); } @@ -164,9 +164,9 @@ _build_proctable(srun_job_t *job, char *host, int nodeid, int ntasks, uint32_t * if(message_thread) { i = MPIR_DEBUG_SPAWNED; pipe_enum = PIPE_MPIR_DEBUG_STATE; - write(job->par_msg->msg_pipe[1], + write(job->forked_msg->par_msg->msg_pipe[1], &pipe_enum,sizeof(int)); - write(job->par_msg->msg_pipe[1], + write(job->forked_msg->par_msg->msg_pipe[1], &i,sizeof(int)); } } @@ -204,9 +204,9 @@ void debugger_launch_failure(srun_job_t *job) /* MPIR_Breakpoint(); */ if(message_thread && job) { i = MPIR_DEBUG_ABORTING; - write(job->par_msg->msg_pipe[1], + write(job->forked_msg->par_msg->msg_pipe[1], &pipe_enum,sizeof(int)); - write(job->par_msg->msg_pipe[1], + write(job->forked_msg->par_msg->msg_pipe[1], &i,sizeof(int)); } else if(!job) { error("Hey I don't have a job to write to on the " @@ -277,10 +277,11 @@ _process_launch_resp(srun_job_t *job, launch_tasks_response_msg_t *msg) pthread_mutex_unlock(&job->task_mutex); if(message_thread) { - write(job->par_msg->msg_pipe[1],&pipe_enum,sizeof(int)); - write(job->par_msg->msg_pipe[1], + write(job->forked_msg-> + par_msg->msg_pipe[1],&pipe_enum,sizeof(int)); + write(job->forked_msg->par_msg->msg_pipe[1], &msg->srun_node_id,sizeof(int)); - write(job->par_msg->msg_pipe[1], + write(job->forked_msg->par_msg->msg_pipe[1], &job->host_state[msg->srun_node_id],sizeof(int)); } @@ -304,9 +305,11 @@ update_running_tasks(srun_job_t *job, uint32_t nodeid) job->task_state[tid] = SRUN_TASK_RUNNING; if(message_thread) { - write(job->par_msg->msg_pipe[1],&pipe_enum,sizeof(int)); - write(job->par_msg->msg_pipe[1],&tid,sizeof(int)); - write(job->par_msg->msg_pipe[1], + write(job->forked_msg-> + par_msg->msg_pipe[1],&pipe_enum,sizeof(int)); + write(job->forked_msg-> + par_msg->msg_pipe[1],&tid,sizeof(int)); + write(job->forked_msg->par_msg->msg_pipe[1], &job->task_state[tid],sizeof(int)); } } @@ -325,10 +328,11 @@ update_failed_tasks(srun_job_t *job, uint32_t nodeid) job->task_state[tid] = SRUN_TASK_FAILED; if(message_thread) { - write(job->par_msg->msg_pipe[1], + write(job->forked_msg->par_msg->msg_pipe[1], &pipe_enum,sizeof(int)); - write(job->par_msg->msg_pipe[1],&tid,sizeof(int)); - write(job->par_msg->msg_pipe[1], + write(job->forked_msg-> + par_msg->msg_pipe[1],&tid,sizeof(int)); + write(job->forked_msg->par_msg->msg_pipe[1], &job->task_state[tid],sizeof(int)); } tasks_exited++; @@ -360,11 +364,11 @@ _launch_handler(srun_job_t *job, slurm_msg_t *resp) slurm_mutex_unlock(&job->task_mutex); if(message_thread) { - write(job->par_msg->msg_pipe[1], + write(job->forked_msg->par_msg->msg_pipe[1], &pipe_enum,sizeof(int)); - write(job->par_msg->msg_pipe[1], + write(job->forked_msg->par_msg->msg_pipe[1], &msg->srun_node_id,sizeof(int)); - write(job->par_msg->msg_pipe[1], + write(job->forked_msg->par_msg->msg_pipe[1], &job->host_state[msg->srun_node_id],sizeof(int)); } update_failed_tasks(job, msg->srun_node_id); @@ -427,10 +431,11 @@ _reattach_handler(srun_job_t *job, slurm_msg_t *msg) slurm_mutex_unlock(&job->task_mutex); if(message_thread) { - write(job->par_msg->msg_pipe[1],&pipe_enum,sizeof(int)); - write(job->par_msg->msg_pipe[1], + write(job->forked_msg-> + par_msg->msg_pipe[1],&pipe_enum,sizeof(int)); + write(job->forked_msg->par_msg->msg_pipe[1], &resp->srun_node_id,sizeof(int)); - write(job->par_msg->msg_pipe[1], + write(job->forked_msg->par_msg->msg_pipe[1], &job->host_state[resp->srun_node_id],sizeof(int)); } @@ -853,7 +858,7 @@ void * msg_thr(void *arg) { srun_job_t *job = (srun_job_t *) arg; - par_to_msg_t *par_msg = job->par_msg; + forked_msg_pipe_t *par_msg = job->forked_msg->par_msg; int done = 0; debug3("msg thread pid = %lu", (unsigned long) getpid()); @@ -870,8 +875,8 @@ void * par_thr(void *arg) { srun_job_t *job = (srun_job_t *) arg; - par_to_msg_t *par_msg = job->par_msg; - par_to_msg_t *msg_par = job->msg_par; + forked_msg_pipe_t *par_msg = job->forked_msg->par_msg; + forked_msg_pipe_t *msg_par = job->forked_msg->msg_par; int c; pipe_enum_t type=0; int tid=-1; @@ -967,11 +972,11 @@ msg_thr_create(srun_job_t *job) int i; pthread_attr_t attr; int c; - job->par_msg = xmalloc(sizeof(par_to_msg_t)); - job->msg_par = xmalloc(sizeof(par_to_msg_t)); - par_to_msg_t *par_msg = job->par_msg; - par_to_msg_t *msg_par = job->msg_par; - + + job->forked_msg = xmalloc(sizeof(forked_msg_t)); + job->forked_msg->par_msg = xmalloc(sizeof(forked_msg_pipe_t)); + job->forked_msg->msg_par = xmalloc(sizeof(forked_msg_pipe_t)); + set_allocate_job(job); for (i = 0; i < job->njfds; i++) { @@ -985,19 +990,21 @@ msg_thr_create(srun_job_t *job) job->jaddr[i]).sin_port)); } - if (pipe(par_msg->msg_pipe) == -1) + if (pipe(job->forked_msg->par_msg->msg_pipe) == -1) return SLURM_ERROR; // there was an error - if (pipe(msg_par->msg_pipe) == -1) + if (pipe(job->forked_msg->msg_par->msg_pipe) == -1) return SLURM_ERROR; // there was an error debug2("created the pipes for communication"); - if((par_msg->pid = fork()) == -1) + if((job->forked_msg->par_msg->pid = fork()) == -1) return SLURM_ERROR; // there was an error - else if (par_msg->pid == 0) + else if (job->forked_msg->par_msg->pid == 0) { // child: setsid(); message_thread = 1; - close(par_msg->msg_pipe[0]); // close read end of pipe - close(msg_par->msg_pipe[1]); // close write end of pipe + close(job->forked_msg-> + par_msg->msg_pipe[0]); // close read end of pipe + close(job->forked_msg-> + msg_par->msg_pipe[1]); // close write end of pipe slurm_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); if ((errno = pthread_create(&job->jtid, &attr, &msg_thr, @@ -1007,12 +1014,15 @@ msg_thr_create(srun_job_t *job) debug("Started msg to parent server thread (%lu)", (unsigned long) job->jtid); - while(read(msg_par->msg_pipe[0],&c,sizeof(int))>0) + while(read(job->forked_msg-> + msg_par->msg_pipe[0],&c,sizeof(int))>0) ; // make sure my parent doesn't leave me hangin - close(msg_par->msg_pipe[0]); // close excess fildes - xfree(par_msg); - xfree(msg_par); + close(job->forked_msg-> + msg_par->msg_pipe[0]); // close excess fildes + xfree(job->forked_msg->par_msg); + xfree(job->forked_msg->msg_par); + xfree(job->forked_msg); _exit(0); } else diff --git a/src/srun/srun.c b/src/srun/srun.c index 4c76c9eb123..714c5001fbe 100644 --- a/src/srun/srun.c +++ b/src/srun/srun.c @@ -250,7 +250,6 @@ int srun(int ac, char **av) env->nodelist = job->nodelist; env->task_count = _task_count_string (job); } - setup_env(env); xfree(env->task_count); xfree(env); diff --git a/src/srun/srun_job.c b/src/srun/srun_job.c index 0b903bedf09..37f70d1dbc6 100644 --- a/src/srun/srun_job.c +++ b/src/srun/srun_job.c @@ -228,8 +228,10 @@ update_job_state(srun_job_t *job, srun_job_state_t state) if (job->state < state) { job->state = state; if(message_thread) { - write(job->par_msg->msg_pipe[1],&pipe_enum,sizeof(int)); - write(job->par_msg->msg_pipe[1],&job->state,sizeof(int)); + write(job->forked_msg-> + par_msg->msg_pipe[1],&pipe_enum,sizeof(int)); + write(job->forked_msg-> + par_msg->msg_pipe[1],&job->state,sizeof(int)); } pthread_cond_signal(&job->state_cond); diff --git a/src/srun/srun_job.h b/src/srun/srun_job.h index 3bfe25a647f..4882ca8dfc5 100644 --- a/src/srun/srun_job.h +++ b/src/srun/srun_job.h @@ -86,11 +86,6 @@ typedef enum { SRUN_TASK_ABNORMAL_EXIT } srun_task_state_t; -typedef struct par_to_msg { - int msg_pipe[2]; - int pid; -} par_to_msg_t; - typedef struct srun_job { uint32_t jobid; /* assigned job id */ uint32_t stepid; /* assigned step id */ @@ -159,8 +154,7 @@ typedef struct srun_job { FILE *errstream; int stdinfd; bool *stdin_eof; /* true if task i processed stdin eof */ - par_to_msg_t *par_msg; - par_to_msg_t *msg_par; + forked_msg_t *forked_msg; select_jobinfo_t select_jobinfo; } srun_job_t; -- GitLab