diff --git a/src/api/spawn.c b/src/api/spawn.c index ae5d3945fcf6b94e238bfe1085fd6c9a135a5daf..b297e631ddf08af9aee86581c9f3e2ec94a75648 100644 --- a/src/api/spawn.c +++ b/src/api/spawn.c @@ -321,7 +321,7 @@ extern int slurm_spawn (slurm_step_ctx ctx, int *fd_array) spawn_task_request_msg_t *msg_array_ptr; int *sock_array; slurm_msg_t *req_array_ptr; - int i, j, rc = SLURM_SUCCESS; + int i, rc = SLURM_SUCCESS; if ((ctx == NULL) || (ctx->magic != STEP_CTX_MAGIC) || @@ -334,8 +334,8 @@ extern int slurm_spawn (slurm_step_ctx ctx, int *fd_array) return SLURM_ERROR; /* validate fd_array and bind them to ports */ - sock_array = xmalloc(ctx->num_tasks * sizeof(int)); - for (i=0; i<ctx->num_tasks; i++) { + sock_array = xmalloc(ctx->nhosts * sizeof(int)); + for (i=0; i<ctx->nhosts; i++) { if (fd_array[i] < 0) { slurm_seterrno(EINVAL); free(sock_array); @@ -351,43 +351,40 @@ extern int slurm_spawn (slurm_step_ctx ctx, int *fd_array) } msg_array_ptr = xmalloc(sizeof(spawn_task_request_msg_t) * - ctx->num_tasks); - req_array_ptr = xmalloc(sizeof(slurm_msg_t) * ctx->num_tasks); + ctx->nhosts); + req_array_ptr = xmalloc(sizeof(slurm_msg_t) * ctx->nhosts); for (i=0; i<ctx->nhosts; i++) { - for (j=0; j<ctx->tasks[i]; j++) { - uint32_t tid = ctx->tids[i][j]; - spawn_task_request_msg_t *r = &msg_array_ptr[tid]; - slurm_msg_t *m = &req_array_ptr[tid]; - - /* Common message contents */ - r->job_id = ctx->job_id; - r->uid = ctx->user_id; - r->argc = ctx->argc; - r->argv = ctx->argv; - r->cred = ctx->step_resp->cred; - r->job_step_id = ctx->step_resp->job_step_id; - r->envc = ctx->envc; - r->env = ctx->env; - r->cwd = ctx->cwd; - r->nnodes = ctx->nhosts; - r->nprocs = ctx->num_tasks; - r->switch_job = ctx->step_resp->switch_job; - r->slurmd_debug = 7; - - /*Task specific message contents */ - r->global_task_id = ctx->tids[i][j]; - r->cpus_allocated = ctx->cpus[i]; - r->srun_node_id = (uint32_t) i; - r->io_port = ntohs(sock_array[i]); - m->msg_type = REQUEST_SPAWN_TASK; - m->data = &msg_array_ptr[i]; - memcpy(&m->address, &ctx->alloc_resp->node_addr[i], - sizeof(slurm_addr)); -#if _DEBUG - printf("tid=%d, fd=%d, port=%u, node_id=%u\n", - tid, fd_array[tid], r->io_port, i); + spawn_task_request_msg_t *r = &msg_array_ptr[i]; + slurm_msg_t *m = &req_array_ptr[i]; + + /* Common message contents */ + r->job_id = ctx->job_id; + r->uid = ctx->user_id; + r->argc = ctx->argc; + r->argv = ctx->argv; + r->cred = ctx->step_resp->cred; + r->job_step_id = ctx->step_resp->job_step_id; + r->envc = ctx->envc; + r->env = ctx->env; + r->cwd = ctx->cwd; + r->nnodes = ctx->nhosts; + r->nprocs = ctx->num_tasks; + r->switch_job = ctx->step_resp->switch_job; + r->slurmd_debug = 7; + + /*Task specific message contents */ + r->global_task_id = ctx->tids[i][0]; + r->cpus_allocated = ctx->cpus[i]; + r->srun_node_id = (uint32_t) i; + r->io_port = ntohs(sock_array[i]); + m->msg_type = REQUEST_SPAWN_TASK; + m->data = r; + memcpy(&m->address, &ctx->alloc_resp->node_addr[i], + sizeof(slurm_addr)); +#if _DEBUG + printf("tid=%d, fd=%d, port=%u, node_id=%u\n", + tid, fd_array[i], r->io_port, i); #endif - } } rc = _p_launch(req_array_ptr, ctx); @@ -693,19 +690,19 @@ static int _p_launch(slurm_msg_t *req, slurm_step_ctx ctx) int rc = SLURM_SUCCESS, i; thd_t *thd; - thd = xmalloc(sizeof(thd_t) * ctx->num_tasks); + thd = xmalloc(sizeof(thd_t) * ctx->nhosts); if (thd == NULL) { slurm_seterrno(ENOMEM); return SLURM_ERROR; } - for (i=0; i<ctx->num_tasks; i++) { + for (i=0; i<ctx->nhosts; i++) { thd[i].state = DSH_NEW; thd[i].req = &req[i]; } /* start all the other threads (up to _MAX_THREAD_COUNT active) */ - for (i=0; i<ctx->num_tasks; i++) { + for (i=0; i<ctx->nhosts; i++) { /* wait until "room" for another thread */ slurm_mutex_lock(&thread_mutex); while (threads_active >= _MAX_THREAD_COUNT) { @@ -733,7 +730,7 @@ static int _p_launch(slurm_msg_t *req, slurm_step_ctx ctx) /* wait for all tasks to terminate*/ slurm_mutex_lock(&thread_mutex); - for (i=0; i<ctx->num_tasks; i++) { + for (i=0; i<ctx->nhosts; i++) { while (thd[i].state < DSH_DONE) { /* wait until another thread completes*/ pthread_cond_wait(&thread_cond, &thread_mutex);