diff --git a/src/api/step_io.c b/src/api/step_io.c index 1cd6ac489a09d9bfcb25d614507ed126fc78f142..fe5d4537f7714c011017741a1807dc08c692fddc 100644 --- a/src/api/step_io.c +++ b/src/api/step_io.c @@ -1162,7 +1162,7 @@ client_io_handler_create(slurm_step_io_fds_t fds, * all of the slurmds to establish IO streams with srun, without * overstressing the TCP/IP backoff/retry algorithm */ - cio->num_listen = _estimate_nports(num_nodes, 64); + cio->num_listen = _estimate_nports(num_nodes, 48); cio->listensock = (int *) xmalloc(cio->num_listen * sizeof(int)); cio->listenport = (int *) xmalloc(cio->num_listen * sizeof(int)); diff --git a/src/api/step_launch.c b/src/api/step_launch.c index 8da80f711e39fab2b6f2d5e8c61fc88bae63c867..9e7426d2afbd1f2222fd798c8a163cc4895d8919 100644 --- a/src/api/step_launch.c +++ b/src/api/step_launch.c @@ -214,11 +214,13 @@ int slurm_step_launch (slurm_step_ctx ctx, launch.io_port = xmalloc(sizeof(uint16_t) * launch.nnodes); launch.resp_port = xmalloc(sizeof(uint16_t) * launch.nnodes); - for (i = 0; i < launch.nnodes; i++) { - client_io_t *client_io = ctx->launch_state->client_io; - int port_idx = i % client_io->num_listen; - - launch.io_port[i] = ntohs(client_io->listenport[port_idx]); + launch.num_io_port = ctx->launch_state->client_io->num_listen; + for (i = 0; i < launch.num_io_port; i++) { + launch.io_port[i] = + ntohs(ctx->launch_state->client_io->listenport[i]); + } + launch.num_resp_port = 1; + for (i = 0; i < launch.num_resp_port; i++) { launch.resp_port[i] = ntohs(ctx->launch_state->msg_port); } diff --git a/src/common/slurm_protocol_defs.h b/src/common/slurm_protocol_defs.h index b1b710be8c63362b536f782e0e1d6fda9e285a91..061985234c51d0b22b7aa164edac7c8f1fbdb534 100644 --- a/src/common/slurm_protocol_defs.h +++ b/src/common/slurm_protocol_defs.h @@ -376,8 +376,10 @@ typedef struct launch_tasks_request_msg { char *cpu_bind; /* binding map for map/mask_cpu */ mem_bind_type_t mem_bind_type; /* --mem_bind= */ char *mem_bind; /* binding map for tasks to memory */ - uint16_t *resp_port; - uint16_t *io_port; + uint16_t num_resp_port; + uint16_t *resp_port; /* array of available response ports */ + uint16_t num_io_port; + uint16_t *io_port; /* array of available client IO listen ports */ uint16_t task_flags; uint32_t **global_task_ids; diff --git a/src/common/slurm_protocol_pack.c b/src/common/slurm_protocol_pack.c index aef16be9ba7c0864c2ca900ad7a8d8f5a8b1e6ef..b9043883778fe231317607339f272e516b7bdb26 100644 --- a/src/common/slurm_protocol_pack.c +++ b/src/common/slurm_protocol_pack.c @@ -2552,12 +2552,16 @@ _pack_launch_tasks_request_msg(launch_tasks_request_msg_t * msg, Buf buffer) for(i=0; i<msg->nnodes; i++) { pack32((uint32_t)msg->tasks_to_launch[i], buffer); pack32((uint32_t)msg->cpus_allocated[i], buffer); - pack16((uint16_t)msg->resp_port[i], buffer); - pack16((uint16_t)msg->io_port[i], buffer); pack32_array(msg->global_task_ids[i], msg->tasks_to_launch[i], buffer); } + pack16((uint16_t)msg->num_resp_port, buffer); + for(i = 0; i < msg->num_resp_port; i++) + pack16((uint16_t)msg->resp_port[i], buffer); + pack16((uint16_t)msg->num_io_port, buffer); + for(i = 0; i < msg->num_io_port; i++) + pack16((uint16_t)msg->io_port[i], buffer); slurm_pack_slurm_addr(&msg->orig_addr, buffer); packstr_array(msg->env, msg->envc, buffer); packstr(msg->cwd, buffer); @@ -2603,14 +2607,10 @@ _unpack_launch_tasks_request_msg(launch_tasks_request_msg_t ** goto unpack_error; msg->tasks_to_launch = xmalloc(sizeof(uint32_t) * msg->nnodes); msg->cpus_allocated = xmalloc(sizeof(uint32_t) * msg->nnodes); - msg->resp_port = xmalloc(sizeof(uint16_t) * msg->nnodes); - msg->io_port = xmalloc(sizeof(uint16_t) * msg->nnodes); msg->global_task_ids = xmalloc(sizeof(uint32_t *) * msg->nnodes); for(i=0; i<msg->nnodes; i++) { safe_unpack32(&msg->tasks_to_launch[i], buffer); safe_unpack32(&msg->cpus_allocated[i], buffer); - safe_unpack16(&msg->resp_port[i], buffer); - safe_unpack16(&msg->io_port[i], buffer); safe_unpack32_array(&msg->global_task_ids[i], &uint32_tmp, buffer); @@ -2618,6 +2618,18 @@ _unpack_launch_tasks_request_msg(launch_tasks_request_msg_t ** goto unpack_error; } + safe_unpack16(&msg->num_resp_port, buffer); + if (msg->num_resp_port > 0) { + msg->resp_port = xmalloc(sizeof(uint16_t)*msg->num_resp_port); + for (i = 0; i < msg->num_resp_port; i++) + safe_unpack16(&msg->resp_port[i], buffer); + } + safe_unpack16(&msg->num_io_port, buffer); + if (msg->num_io_port > 0) { + msg->io_port = xmalloc(sizeof(uint16_t)*msg->num_io_port); + for (i = 0; i < msg->num_io_port; i++) + safe_unpack16(&msg->io_port[i], buffer); + } slurm_unpack_slurm_addr_no_alloc(&msg->orig_addr, buffer); safe_unpackstr_array(&msg->env, &msg->envc, buffer); safe_unpackstr_xmalloc(&msg->cwd, &uint16_tmp, buffer); diff --git a/src/slurmd/slurmstepd/mgr.c b/src/slurmd/slurmstepd/mgr.c index f38bd6a9b6deea50dd35332921df98240e9dce25..8e7129143639269603643c19303da3649ae8c0f4 100644 --- a/src/slurmd/slurmstepd/mgr.c +++ b/src/slurmd/slurmstepd/mgr.c @@ -1320,7 +1320,7 @@ _send_launch_failure (launch_tasks_request_msg_t *msg, slurm_addr *cli, int rc) memcpy(&resp_msg.address, cli, sizeof(slurm_addr)); slurm_set_addr(&resp_msg.address, - msg->resp_port[msg->srun_node_id], + msg->resp_port[msg->srun_node_id % msg->num_resp_port], NULL); resp_msg.data = &resp; resp_msg.msg_type = RESPONSE_LAUNCH_TASKS; diff --git a/src/slurmd/slurmstepd/slurmstepd_job.c b/src/slurmd/slurmstepd/slurmstepd_job.c index 2bcfad72333ccaf31e1dbf6cec2c46b7c0ef466e..6cb42527a50ebdb46b9ae24ae92666af9c0fd0fa 100644 --- a/src/slurmd/slurmstepd/slurmstepd_job.c +++ b/src/slurmd/slurmstepd/slurmstepd_job.c @@ -198,9 +198,13 @@ job_create(launch_tasks_request_msg_t *msg, slurm_addr *cli_addr) job->envtp->mem_bind = NULL; memcpy(&resp_addr, &msg->orig_addr, sizeof(slurm_addr)); - slurm_set_addr(&resp_addr, msg->resp_port[msg->srun_node_id], NULL); + slurm_set_addr(&resp_addr, + msg->resp_port[msg->srun_node_id % msg->num_resp_port], + NULL); memcpy(&io_addr, &msg->orig_addr, sizeof(slurm_addr)); - slurm_set_addr(&io_addr, msg->io_port[msg->srun_node_id], NULL); + slurm_set_addr(&io_addr, + msg->io_port[msg->srun_node_id % msg->num_io_port], + NULL); srun = srun_info_create(msg->cred, &resp_addr, &io_addr); diff --git a/src/srun/launch.c b/src/srun/launch.c index f7f871d1eab36e488565d3bc52076f6aa764aec1..3dd4693dcb9804543041f7bfd90b914f26d7affa 100644 --- a/src/srun/launch.c +++ b/src/srun/launch.c @@ -175,14 +175,19 @@ launch(void *arg) r.global_task_ids = job->step_layout->tids; r.cpus_allocated = job->step_layout->cpus; - r.io_port = xmalloc(sizeof(uint16_t) * job->step_layout->num_hosts); - r.resp_port = xmalloc(sizeof(uint16_t) * job->step_layout->num_hosts); - - for (i = 0; i < job->step_layout->num_hosts; i++) { - r.io_port[i] = ntohs(job->client_io->listenport[ - i%job->client_io->num_listen]); - r.resp_port[i] = ntohs(job->jaddr[i%job->njfds].sin_port); + r.num_resp_port = job->njfds; + r.resp_port = xmalloc(sizeof(uint16_t) * r.num_resp_port); + for (i = 0; i < r.num_resp_port; i++) { + r.resp_port[i] = ntohs(job->jaddr[i].sin_port); + } + + r.num_io_port = job->client_io->num_listen; + r.io_port = xmalloc(sizeof(uint16_t) * r.num_io_port); + for (i = 0; i < r.num_io_port; i++) { + r.io_port[i] = ntohs(job->client_io->listenport[i]); } + info("num_io_port = %d", r.num_io_port); + info("num_resp_port = %d", r.num_resp_port); msg_array_ptr[0].msg_type = REQUEST_LAUNCH_TASKS; msg_array_ptr[0].data = &r;