From 91b81fe82698a720f3a788f08d9ac81ca8bc440f Mon Sep 17 00:00:00 2001 From: Moe Jette <jette1@llnl.gov> Date: Thu, 10 Nov 2005 17:29:31 +0000 Subject: [PATCH] Put into place most infrastructure for PMI barrier. --- src/api/pmi.c | 2 +- src/api/slurm_pmi.c | 52 +++++++++++++++++++++----------- src/api/slurm_pmi.h | 2 +- src/common/slurm_protocol_defs.c | 8 +++++ src/common/slurm_protocol_defs.h | 7 +++++ src/common/slurm_protocol_pack.c | 33 +++++++++++++++++++- src/srun/msg.c | 7 +++-- src/srun/pmi.c | 10 ++++++ src/srun/pmi.h | 4 +++ 9 files changed, 103 insertions(+), 22 deletions(-) diff --git a/src/api/pmi.c b/src/api/pmi.c index 169ee5104e6..74e60ffe95d 100644 --- a/src/api/pmi.c +++ b/src/api/pmi.c @@ -553,7 +553,7 @@ int PMI_Barrier( void ) int i, j, k, rc = PMI_SUCCESS; /* Issue the RPC */ - if (slurm_get_kvs_comm_set(&kvs_set_ptr) != SLURM_SUCCESS) + if (slurm_get_kvs_comm_set(&kvs_set_ptr, pmi_rank) != SLURM_SUCCESS) return PMI_FAIL; if (kvs_set_ptr == NULL) return PMI_SUCCESS; diff --git a/src/api/slurm_pmi.c b/src/api/slurm_pmi.c index 53a03842d42..86ce5c318e4 100644 --- a/src/api/slurm_pmi.c +++ b/src/api/slurm_pmi.c @@ -75,39 +75,54 @@ int slurm_send_kvs_comm_set(struct kvs_comm_set *kvs_set_ptr) if (msg_rcv.msg_type != RESPONSE_SLURM_RC) return SLURM_UNEXPECTED_MSG_ERROR; rc = ((return_code_msg_t *) msg_rcv.data)->return_code; + slurm_free_return_code_msg((return_code_msg_t *) msg_rcv.data); return rc; } /* Wait for barrier and get full PMI Keyval space data */ -int slurm_get_kvs_comm_set(struct kvs_comm_set **kvs_set_ptr) +int slurm_get_kvs_comm_set(struct kvs_comm_set **kvs_set_ptr, int pmi_rank) { - int rc; - slurm_msg_t req_msg; - slurm_msg_t resp_msg; + int rc, pmi_fd; + slurm_msg_t msg_send, msg_rcv; + slurm_addr slurm_address; + char hostname[64]; + uint16_t port; + kvs_get_msg_t data; if (kvs_set_ptr == NULL) return EINVAL; if ((rc = _get_addr()) != SLURM_SUCCESS) return rc; + if ((pmi_fd = slurm_init_msg_engine_port(0)) < 0) + return SLURM_ERROR; + if (slurm_get_stream_addr(pmi_fd, &slurm_address) < 0) + return SLURM_ERROR; + fd_set_nonblocking(pmi_fd); + port = slurm_address.sin_port; + getnodename(hostname, sizeof(hostname)); - req_msg.msg_type = PMI_KVS_GET_REQ; - req_msg.data = NULL; + data.task_id = pmi_rank; + data.port = port; + data.hostname = hostname; + msg_send.address = srun_addr; + msg_send.msg_type = PMI_KVS_GET_REQ; + msg_send.data = &data; - /* Send the RPC to the local slurmd_step manager */ -/* FIXME, sending to slurmctld right now */ -#if 0 - if (slurm_send_recv_controller_msg(&req_msg, &resp_msg) < 0) + /* Send the RPC to the local srun communcation manager */ + if (slurm_send_recv_node_msg(&msg_send, &msg_rcv, 0) < 0) return SLURM_ERROR; + if (msg_rcv.msg_type != RESPONSE_SLURM_RC) + return SLURM_UNEXPECTED_MSG_ERROR; + rc = ((return_code_msg_t *) msg_rcv.data)->return_code; + slurm_free_return_code_msg((return_code_msg_t *) msg_rcv.data); + if (rc != SLURM_SUCCESS) + return rc; - slurm_free_cred(resp_msg.cred); - if (resp_msg.msg_type == PMI_KVS_GET_RESP) - *kvs_set_ptr = (struct kvs_comm_set *) resp_msg.data; - else - slurm_seterrno_ret(SLURM_UNEXPECTED_MSG_ERROR); -#else + /* get the message after all tasks reach the barrier */ +/* slurm_close_accepted_conn(pmi_fd); Consider leaving socket open */ *kvs_set_ptr = NULL; -#endif + return SLURM_SUCCESS; } @@ -133,6 +148,9 @@ void slurm_free_kvs_comm_set(struct kvs_comm_set *kvs_set_ptr) { int i; + if (kvs_set_ptr == NULL) + return; + for (i=0; i<kvs_set_ptr->kvs_comm_recs; i++) _free_kvs_comm(kvs_set_ptr->kvs_comm_ptr[i]); xfree(kvs_set_ptr->kvs_comm_ptr); diff --git a/src/api/slurm_pmi.h b/src/api/slurm_pmi.h index c084b77fa0f..4450d1175a3 100644 --- a/src/api/slurm_pmi.h +++ b/src/api/slurm_pmi.h @@ -62,7 +62,7 @@ struct kvs_comm_set { int slurm_send_kvs_comm_set(struct kvs_comm_set *kvs_set_ptr); /* Wait for barrier and get full PMI Keyval space data */ -int slurm_get_kvs_comm_set(struct kvs_comm_set **kvs_set_ptr); +int slurm_get_kvs_comm_set(struct kvs_comm_set **kvs_set_ptr, int pmi_rank); /* Free kvs_comm_set returned by slurm_get_kvs_comm_set() */ void slurm_free_kvs_comm_set(struct kvs_comm_set *kvs_set_ptr); diff --git a/src/common/slurm_protocol_defs.c b/src/common/slurm_protocol_defs.c index 18fe226a9d3..bf059d7cbdf 100644 --- a/src/common/slurm_protocol_defs.c +++ b/src/common/slurm_protocol_defs.c @@ -478,6 +478,14 @@ void inline slurm_free_jobacct_msg(jobacct_msg_t *msg) xfree(msg); } +void inline slurm_free_get_kvs_msg(kvs_get_msg_t *msg) +{ + if (msg) { + xfree(msg->hostname); + xfree(msg); + } +} + char *job_state_string(enum job_states inx) { if (inx & JOB_COMPLETING) diff --git a/src/common/slurm_protocol_defs.h b/src/common/slurm_protocol_defs.h index cc209f3bcb9..46579f9ccf8 100644 --- a/src/common/slurm_protocol_defs.h +++ b/src/common/slurm_protocol_defs.h @@ -453,6 +453,12 @@ typedef struct jobacct_msg { char *data; /* data structure; we just pass it to them. */ } jobacct_msg_t; +typedef struct kvs_get_msg { + uint16_t task_id; /* job step's task id */ + uint16_t port; /* port to be sent the kvs data */ + char * hostname; /* hostname to be sent the kvs data */ +} kvs_get_msg_t; + /*****************************************************************************\ * Slurm API Message Types \*****************************************************************************/ @@ -552,6 +558,7 @@ void slurm_free_job_step_info_response_msg( job_step_info_response_msg_t * msg); void slurm_free_node_info_msg(node_info_msg_t * msg); void slurm_free_partition_info_msg(partition_info_msg_t * msg); +void slurm_free_get_kvs_msg(kvs_get_msg_t *msg); extern char *job_reason_string(enum job_wait_reason inx); extern char *job_state_string(enum job_states inx); diff --git a/src/common/slurm_protocol_pack.c b/src/common/slurm_protocol_pack.c index 57823022bdd..b9db1bc82fe 100644 --- a/src/common/slurm_protocol_pack.c +++ b/src/common/slurm_protocol_pack.c @@ -266,6 +266,8 @@ static void _pack_kvs_rec(struct kvs_comm *msg_ptr, Buf buffer); static int _unpack_kvs_rec(struct kvs_comm **msg_ptr, Buf buffer); static void _pack_kvs_data(struct kvs_comm_set *msg_ptr, Buf buffer); static int _unpack_kvs_data(struct kvs_comm_set **msg_ptr, Buf buffer); +static void _pack_kvs_get(kvs_get_msg_t *msg_ptr, Buf buffer); +static int _unpack_kvs_get(kvs_get_msg_t **msg_ptr, Buf buffer); /* pack_header * packs a slurm protocol header that proceeds every slurm message @@ -546,8 +548,10 @@ pack_msg(slurm_msg_t const *msg, Buf buffer) _pack_kvs_data((struct kvs_comm_set *) msg->data, buffer); break; case PMI_KVS_GET_REQ: + _pack_kvs_get((kvs_get_msg_t *) msg->data, buffer); + break; case PMI_KVS_PUT_RESP: - break; /* no data */ + break; /* no data in message */ default: debug("No pack method for msg type %i", msg->msg_type); return EINVAL; @@ -825,6 +829,8 @@ unpack_msg(slurm_msg_t * msg, Buf buffer) buffer); break; case PMI_KVS_GET_REQ: + rc = _unpack_kvs_get((kvs_get_msg_t **) &msg->data, buffer); + break; case PMI_KVS_PUT_RESP: break; /* no data */ default: @@ -3361,6 +3367,31 @@ unpack_error: return SLURM_ERROR; } +static void _pack_kvs_get(kvs_get_msg_t *msg_ptr, Buf buffer) +{ + pack16(msg_ptr->task_id, buffer); + pack16(msg_ptr->port, buffer); + packstr(msg_ptr->hostname, buffer); +} + +static int _unpack_kvs_get(kvs_get_msg_t **msg_ptr, Buf buffer) +{ + uint16_t uint16_tmp; + kvs_get_msg_t *msg; + + msg = xmalloc(sizeof(struct kvs_get_msg)); + *msg_ptr = msg; + safe_unpack16(&msg->task_id, buffer); + safe_unpack16(&msg->port, buffer); + safe_unpackstr_xmalloc(&msg->hostname, &uint16_tmp, buffer); + return SLURM_SUCCESS; + +unpack_error: + xfree(msg); + *msg_ptr = NULL; + return SLURM_ERROR; +} + /* template void pack_ ( * msg , Buf buffer ) { diff --git a/src/srun/msg.c b/src/srun/msg.c index 264e32e4842..06caeeabdac 100644 --- a/src/srun/msg.c +++ b/src/srun/msg.c @@ -705,12 +705,15 @@ _handle_msg(srun_job_t *job, slurm_msg_t *msg) slurm_free_resource_allocation_response_msg(msg->data); break; case PMI_KVS_PUT_REQ: - info("PMI_KVS_PUT_REQ received"); + debug3("PMI_KVS_PUT_REQ received"); rc = pmi_kvs_put((struct kvs_comm_set *) msg->data); slurm_send_rc_msg(msg, rc); break; case PMI_KVS_GET_REQ: - info("PMI_KVS_GET_REQ received"); + debug3("PMI_KVS_GET_REQ received"); + rc = pmi_kvs_get((kvs_get_msg_t *) msg->data); + slurm_send_rc_msg(msg, rc); + slurm_free_get_kvs_msg((kvs_get_msg_t *) msg->data); break; default: error("received spurious message type: %d\n", diff --git a/src/srun/pmi.c b/src/srun/pmi.c index ebb246f01df..c0bb1880fda 100644 --- a/src/srun/pmi.c +++ b/src/srun/pmi.c @@ -33,6 +33,7 @@ #include <slurm/slurm_errno.h> #include "src/api/slurm_pmi.h" +#include "src/common/slurm_protocol_defs.h" #include "src/common/xmalloc.h" #define _DEBUG 1 @@ -97,6 +98,7 @@ static void _print_kvs(void) #if _DEBUG int i, j; + info("KVS dump start"); for (i=0; i<kvs_comm_cnt; i++) { for (j=0; j<kvs_comm_ptr[i]->kvs_cnt; j++) { info("KVS: %s:%s:%s", kvs_comm_ptr[i]->kvs_name, @@ -111,6 +113,7 @@ extern int pmi_kvs_put(struct kvs_comm_set *kvs_set_ptr) { int i; struct kvs_comm *kvs_ptr; + /* Merge new data with old. * NOTE: We just move pointers rather than copy data where * possible for improved performance */ @@ -131,3 +134,10 @@ extern int pmi_kvs_put(struct kvs_comm_set *kvs_set_ptr) pthread_mutex_unlock(&kvs_mutex); return SLURM_SUCCESS; } + +extern int pmi_kvs_get(kvs_get_msg_t *kvs_get_ptr) +{ +debug("pmi_kvs_get: rank:%u port:%u, host:%s", kvs_get_ptr->task_id, kvs_get_ptr->port, kvs_get_ptr->hostname); + return SLURM_SUCCESS; +} + diff --git a/src/srun/pmi.h b/src/srun/pmi.h index f435be7a7ea..a7cf96e9dd0 100644 --- a/src/srun/pmi.h +++ b/src/srun/pmi.h @@ -33,4 +33,8 @@ /* Put the supplied kvs values into the common store */ extern int pmi_kvs_put(struct kvs_comm_set *kvs_set_ptr); +/* Note that a task has reached a barrier, + * transmit the kvs values to the task */ +extern int pmi_kvs_get(kvs_get_msg_t *kvs_get_ptr); + #endif -- GitLab