From c59d3340cb15f1b32dd97415765fd20e39c72bca Mon Sep 17 00:00:00 2001 From: "Christopher J. Morrone" <morrone2@llnl.gov> Date: Fri, 14 Jul 2006 21:49:47 +0000 Subject: [PATCH] Significantly reduce the size of the launch_tasks_request_msg_t arrays "resp_port" and "io_port". Now they just list the available listening ports in the client (srun/slaunch), instead of listing a port per node (which contains many duplicate ports). The slurmd picks a port to use based on number of available ports modulo its own rank in the job step. --- src/api/step_io.c | 2 +- src/api/step_launch.c | 12 +++++++----- src/common/slurm_protocol_defs.h | 6 ++++-- src/common/slurm_protocol_pack.c | 24 ++++++++++++++++++------ src/slurmd/slurmstepd/mgr.c | 2 +- src/slurmd/slurmstepd/slurmstepd_job.c | 8 ++++++-- src/srun/launch.c | 19 ++++++++++++------- 7 files changed, 49 insertions(+), 24 deletions(-) diff --git a/src/api/step_io.c b/src/api/step_io.c index 1cd6ac489a0..fe5d4537f77 100644 --- a/src/api/step_io.c +++ b/src/api/step_io.c @@ -1162,7 +1162,7 @@ client_io_handler_create(slurm_step_io_fds_t fds, * all of the slurmds to establish IO streams with srun, without * overstressing the TCP/IP backoff/retry algorithm */ - cio->num_listen = _estimate_nports(num_nodes, 64); + cio->num_listen = _estimate_nports(num_nodes, 48); cio->listensock = (int *) xmalloc(cio->num_listen * sizeof(int)); cio->listenport = (int *) xmalloc(cio->num_listen * sizeof(int)); diff --git a/src/api/step_launch.c b/src/api/step_launch.c index 8da80f711e3..9e7426d2afb 100644 --- a/src/api/step_launch.c +++ b/src/api/step_launch.c @@ -214,11 +214,13 @@ int slurm_step_launch (slurm_step_ctx ctx, launch.io_port = xmalloc(sizeof(uint16_t) * launch.nnodes); launch.resp_port = xmalloc(sizeof(uint16_t) * launch.nnodes); - for (i = 0; i < launch.nnodes; i++) { - client_io_t *client_io = ctx->launch_state->client_io; - int port_idx = i % client_io->num_listen; - - launch.io_port[i] = ntohs(client_io->listenport[port_idx]); + launch.num_io_port = ctx->launch_state->client_io->num_listen; + for (i = 0; i < launch.num_io_port; i++) { + launch.io_port[i] = + ntohs(ctx->launch_state->client_io->listenport[i]); + } + launch.num_resp_port = 1; + for (i = 0; i < launch.num_resp_port; i++) { launch.resp_port[i] = ntohs(ctx->launch_state->msg_port); } diff --git a/src/common/slurm_protocol_defs.h b/src/common/slurm_protocol_defs.h index b1b710be8c6..061985234c5 100644 --- a/src/common/slurm_protocol_defs.h +++ b/src/common/slurm_protocol_defs.h @@ -376,8 +376,10 @@ typedef struct launch_tasks_request_msg { char *cpu_bind; /* binding map for map/mask_cpu */ mem_bind_type_t mem_bind_type; /* --mem_bind= */ char *mem_bind; /* binding map for tasks to memory */ - uint16_t *resp_port; - uint16_t *io_port; + uint16_t num_resp_port; + uint16_t *resp_port; /* array of available response ports */ + uint16_t num_io_port; + uint16_t *io_port; /* array of available client IO listen ports */ uint16_t task_flags; uint32_t **global_task_ids; diff --git a/src/common/slurm_protocol_pack.c b/src/common/slurm_protocol_pack.c index aef16be9ba7..b9043883778 100644 --- a/src/common/slurm_protocol_pack.c +++ b/src/common/slurm_protocol_pack.c @@ -2552,12 +2552,16 @@ _pack_launch_tasks_request_msg(launch_tasks_request_msg_t * msg, Buf buffer) for(i=0; i<msg->nnodes; i++) { pack32((uint32_t)msg->tasks_to_launch[i], buffer); pack32((uint32_t)msg->cpus_allocated[i], buffer); - pack16((uint16_t)msg->resp_port[i], buffer); - pack16((uint16_t)msg->io_port[i], buffer); pack32_array(msg->global_task_ids[i], msg->tasks_to_launch[i], buffer); } + pack16((uint16_t)msg->num_resp_port, buffer); + for(i = 0; i < msg->num_resp_port; i++) + pack16((uint16_t)msg->resp_port[i], buffer); + pack16((uint16_t)msg->num_io_port, buffer); + for(i = 0; i < msg->num_io_port; i++) + pack16((uint16_t)msg->io_port[i], buffer); slurm_pack_slurm_addr(&msg->orig_addr, buffer); packstr_array(msg->env, msg->envc, buffer); packstr(msg->cwd, buffer); @@ -2603,14 +2607,10 @@ _unpack_launch_tasks_request_msg(launch_tasks_request_msg_t ** goto unpack_error; msg->tasks_to_launch = xmalloc(sizeof(uint32_t) * msg->nnodes); msg->cpus_allocated = xmalloc(sizeof(uint32_t) * msg->nnodes); - msg->resp_port = xmalloc(sizeof(uint16_t) * msg->nnodes); - msg->io_port = xmalloc(sizeof(uint16_t) * msg->nnodes); msg->global_task_ids = xmalloc(sizeof(uint32_t *) * msg->nnodes); for(i=0; i<msg->nnodes; i++) { safe_unpack32(&msg->tasks_to_launch[i], buffer); safe_unpack32(&msg->cpus_allocated[i], buffer); - safe_unpack16(&msg->resp_port[i], buffer); - safe_unpack16(&msg->io_port[i], buffer); safe_unpack32_array(&msg->global_task_ids[i], &uint32_tmp, buffer); @@ -2618,6 +2618,18 @@ _unpack_launch_tasks_request_msg(launch_tasks_request_msg_t ** goto unpack_error; } + safe_unpack16(&msg->num_resp_port, buffer); + if (msg->num_resp_port > 0) { + msg->resp_port = xmalloc(sizeof(uint16_t)*msg->num_resp_port); + for (i = 0; i < msg->num_resp_port; i++) + safe_unpack16(&msg->resp_port[i], buffer); + } + safe_unpack16(&msg->num_io_port, buffer); + if (msg->num_io_port > 0) { + msg->io_port = xmalloc(sizeof(uint16_t)*msg->num_io_port); + for (i = 0; i < msg->num_io_port; i++) + safe_unpack16(&msg->io_port[i], buffer); + } slurm_unpack_slurm_addr_no_alloc(&msg->orig_addr, buffer); safe_unpackstr_array(&msg->env, &msg->envc, buffer); safe_unpackstr_xmalloc(&msg->cwd, &uint16_tmp, buffer); diff --git a/src/slurmd/slurmstepd/mgr.c b/src/slurmd/slurmstepd/mgr.c index f38bd6a9b6d..8e712914363 100644 --- a/src/slurmd/slurmstepd/mgr.c +++ b/src/slurmd/slurmstepd/mgr.c @@ -1320,7 +1320,7 @@ _send_launch_failure (launch_tasks_request_msg_t *msg, slurm_addr *cli, int rc) memcpy(&resp_msg.address, cli, sizeof(slurm_addr)); slurm_set_addr(&resp_msg.address, - msg->resp_port[msg->srun_node_id], + msg->resp_port[msg->srun_node_id % msg->num_resp_port], NULL); resp_msg.data = &resp; resp_msg.msg_type = RESPONSE_LAUNCH_TASKS; diff --git a/src/slurmd/slurmstepd/slurmstepd_job.c b/src/slurmd/slurmstepd/slurmstepd_job.c index 2bcfad72333..6cb42527a50 100644 --- a/src/slurmd/slurmstepd/slurmstepd_job.c +++ b/src/slurmd/slurmstepd/slurmstepd_job.c @@ -198,9 +198,13 @@ job_create(launch_tasks_request_msg_t *msg, slurm_addr *cli_addr) job->envtp->mem_bind = NULL; memcpy(&resp_addr, &msg->orig_addr, sizeof(slurm_addr)); - slurm_set_addr(&resp_addr, msg->resp_port[msg->srun_node_id], NULL); + slurm_set_addr(&resp_addr, + msg->resp_port[msg->srun_node_id % msg->num_resp_port], + NULL); memcpy(&io_addr, &msg->orig_addr, sizeof(slurm_addr)); - slurm_set_addr(&io_addr, msg->io_port[msg->srun_node_id], NULL); + slurm_set_addr(&io_addr, + msg->io_port[msg->srun_node_id % msg->num_io_port], + NULL); srun = srun_info_create(msg->cred, &resp_addr, &io_addr); diff --git a/src/srun/launch.c b/src/srun/launch.c index f7f871d1eab..3dd4693dcb9 100644 --- a/src/srun/launch.c +++ b/src/srun/launch.c @@ -175,14 +175,19 @@ launch(void *arg) r.global_task_ids = job->step_layout->tids; r.cpus_allocated = job->step_layout->cpus; - r.io_port = xmalloc(sizeof(uint16_t) * job->step_layout->num_hosts); - r.resp_port = xmalloc(sizeof(uint16_t) * job->step_layout->num_hosts); - - for (i = 0; i < job->step_layout->num_hosts; i++) { - r.io_port[i] = ntohs(job->client_io->listenport[ - i%job->client_io->num_listen]); - r.resp_port[i] = ntohs(job->jaddr[i%job->njfds].sin_port); + r.num_resp_port = job->njfds; + r.resp_port = xmalloc(sizeof(uint16_t) * r.num_resp_port); + for (i = 0; i < r.num_resp_port; i++) { + r.resp_port[i] = ntohs(job->jaddr[i].sin_port); + } + + r.num_io_port = job->client_io->num_listen; + r.io_port = xmalloc(sizeof(uint16_t) * r.num_io_port); + for (i = 0; i < r.num_io_port; i++) { + r.io_port[i] = ntohs(job->client_io->listenport[i]); } + info("num_io_port = %d", r.num_io_port); + info("num_resp_port = %d", r.num_resp_port); msg_array_ptr[0].msg_type = REQUEST_LAUNCH_TASKS; msg_array_ptr[0].data = &r; -- GitLab