diff --git a/src/api/Makefile.am b/src/api/Makefile.am index 713c952afd036fd1caf49c201f612165f726b80f..88b42cf2d6a0e4af98eda703d7e202a3620404b8 100644 --- a/src/api/Makefile.am +++ b/src/api/Makefile.am @@ -56,6 +56,7 @@ libslurm_la_SOURCES = \ node_info.c \ node_select_info.c node_select_info.h \ partition_info.c \ + slurm_pmi.c slurm_pmi.h \ spawn.c \ submit.c \ reconfigure.c \ diff --git a/src/api/pmi.c b/src/api/pmi.c index bfda4c6a21c9064065b3c38b26920b01cfdbe536..169ee5104e658c7b6769944d100ea2da40312841 100644 --- a/src/api/pmi.c +++ b/src/api/pmi.c @@ -81,29 +81,24 @@ #define _GNU_SOURCE #include <pthread.h> -#include <stdint.h> #include <stdlib.h> #include <string.h> #include <slurm/pmi.h> +#include <slurm/slurm_errno.h> -/* These define values should probaby be moved to a location easily - * accessible to other SLURM tools. */ -#define PMI_MAX_GRP_ID_LEN 32 /* Maximum size of a process group ID */ -#define PMI_MAX_KEY_LEN 256 /* Maximum size of a PMI key */ -#define PMI_MAX_KVSNAME_LEN 256 /* Maximum size of KVS name */ -#define PMI_MAX_VAL_LEN 256 /* Maximum size of a PMI value */ +#include "src/api/slurm_pmi.h" #define KVS_STATE_LOCAL 0 #define KVS_STATE_DEFUNCT 1 -/* default key names form is jobid.stepid[.sequence] */ +/* default key names form is jobid.stepid[.taskid.sequence] */ struct kvs_rec { - char * kvs_name; - int kvs_state; /* see KVS_STATE_* */ - int kvs_cnt; - int kvs_inx; /* iteration index */ - char ** kvs_keys; - char ** kvs_values; + char * kvs_name; + uint16_t kvs_state; /* see KVS_STATE_* */ + uint16_t kvs_cnt; /* count of key-pairs */ + uint16_t kvs_inx; /* iteration index */ + char ** kvs_keys; + char ** kvs_values; }; static void _init_kvs( char kvsname[] ); @@ -118,13 +113,7 @@ int pmi_size; int pmi_spawned; int pmi_rank; -#if 1 - pthread_mutex_t kvs_mutex = PTHREAD_MUTEX_INITIALIZER; -#else - int kvs_mutex; - static inline int pthread_mutex_lock(int *kvs_mutex) { return 0; } - static inline int pthread_mutex_unlock(int *kvs_mutex) { return 0; } -#endif +pthread_mutex_t kvs_mutex = PTHREAD_MUTEX_INITIALIZER; int kvs_rec_cnt = 0; struct kvs_rec *kvs_recs; @@ -491,15 +480,7 @@ as long as the number returned by 'PMI_Get_id_length_max()'. @*/ int PMI_Get_id( char id_str[], int length ) { - if (id_str == NULL) - return PMI_ERR_INVALID_ARG; - if (length < PMI_MAX_GRP_ID_LEN) - return PMI_ERR_INVALID_LENGTH; - if ((pmi_jobid < 0) || (pmi_stepid < 0)) - return PMI_FAIL; - - snprintf(id_str, length, "%ld.%ld", pmi_jobid, pmi_stepid); - return PMI_SUCCESS; + return PMI_FAIL; } /*@ @@ -549,10 +530,6 @@ This function returns the maximum length of a process group id string. @*/ int PMI_Get_id_length_max( int *length ) { - if (length == NULL) - return PMI_ERR_INVALID_ARG; - - *length = PMI_MAX_GRP_ID_LEN; return PMI_FAIL; } @@ -571,8 +548,30 @@ have called 'PMI_Barrier()'. @*/ int PMI_Barrier( void ) { - /* FIXME */ - return PMI_FAIL; + struct kvs_comm_set *kvs_set_ptr = NULL; + struct kvs_comm *kvs_ptr; + int i, j, k, rc = PMI_SUCCESS; + + /* Issue the RPC */ + if (slurm_get_kvs_comm_set(&kvs_set_ptr) != SLURM_SUCCESS) + return PMI_FAIL; + if (kvs_set_ptr == NULL) + return PMI_SUCCESS; + + for (i=0; i<kvs_set_ptr->kvs_comm_recs; i++) { + kvs_ptr = kvs_set_ptr->kvs_comm_ptr[i]; + for (j=0; j<kvs_ptr->kvs_cnt; j++) { + k = PMI_KVS_Put(kvs_ptr->kvs_name, + kvs_ptr->kvs_keys[j], + kvs_ptr->kvs_values[j]); + if (k != PMI_SUCCESS) + rc = k; + } + } + + /* Release temporary storage from RPC */ + slurm_free_kvs_comm_set(kvs_set_ptr); + return rc; } /*@ @@ -966,11 +965,53 @@ the specified keyval space. It is a process local operation. @*/ int PMI_KVS_Commit( const char kvsname[] ) { + struct kvs_comm_set kvs_set; + int i, rc; + Buf buffer; + if ((kvsname == NULL) || (strlen(kvsname) > PMI_MAX_KVSNAME_LEN)) return PMI_ERR_INVALID_ARG; - /* FIXME */ - return PMI_FAIL; + /* Pack records into RPC for sending to slurmd_step + * NOTE: For arrays we copy pointers, not values */ + kvs_set.task_id = pmi_rank; + kvs_set.kvs_comm_recs = 0; + kvs_set.kvs_comm_ptr = NULL; + + pthread_mutex_lock(&kvs_mutex); + for (i=0; i<kvs_rec_cnt; i++) { + if (kvs_recs[i].kvs_state == KVS_STATE_DEFUNCT) + continue; + kvs_set.kvs_comm_ptr = realloc(kvs_set.kvs_comm_ptr, + (sizeof(struct kvs_comm *) * + (kvs_set.kvs_comm_recs+1))); + kvs_set.kvs_comm_ptr[kvs_set.kvs_comm_recs] = + malloc(sizeof(struct kvs_comm)); + kvs_set.kvs_comm_ptr[kvs_set.kvs_comm_recs]->kvs_name = + kvs_recs[i].kvs_name; + kvs_set.kvs_comm_ptr[kvs_set.kvs_comm_recs]->kvs_cnt = + kvs_recs[i].kvs_cnt; + kvs_set.kvs_comm_ptr[kvs_set.kvs_comm_recs]->kvs_keys = + kvs_recs[i].kvs_keys; + kvs_set.kvs_comm_ptr[kvs_set.kvs_comm_recs]->kvs_values = + kvs_recs[i].kvs_values; + kvs_set.kvs_comm_recs++; + } + + /* Send the RPC */ + if (slurm_send_kvs_comm_set(&kvs_set) != SLURM_SUCCESS) + rc = PMI_FAIL; + else + rc = PMI_SUCCESS; + pthread_mutex_unlock(&kvs_mutex); + + /* Free any temporary storage */ + for (i=0; i<kvs_set.kvs_comm_recs; i++) + free(kvs_set.kvs_comm_ptr[i]); + if (kvs_set.kvs_comm_ptr) + free(kvs_set.kvs_comm_ptr); + + return rc; } /*@ diff --git a/src/api/slurm_pmi.c b/src/api/slurm_pmi.c new file mode 100644 index 0000000000000000000000000000000000000000..42bacbfbc56e53d1d0ba4f3aad12957beacded36 --- /dev/null +++ b/src/api/slurm_pmi.c @@ -0,0 +1,110 @@ +/****************************************************************************\ + * slurm_pmi.c - PMI support functions internal to SLURM + ***************************************************************************** + * Copyright (C) 2005 The Regents of the University of California. + * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). + * Written by Morris Jette <jette1@llnl.gov>. + * UCRL-CODE-2002-040. + * + * This file is part of SLURM, a resource management program. + * For details, see <http://www.llnl.gov/linux/slurm/>. + * + * SLURM is free software; you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the License, or (at your option) + * any later version. + * + * SLURM is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along + * with SLURM; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. +\*****************************************************************************/ + +#include <slurm/slurm.h> +#include <slurm/slurm_errno.h> + +#include "src/api/slurm_pmi.h" +#include "src/common/slurm_protocol_defs.h" +#include "src/common/xmalloc.h" + +/* Transmit PMI Keyval space data */ +int slurm_send_kvs_comm_set(struct kvs_comm_set *kvs_set_ptr) +{ + slurm_msg_t msg; + int rc; + + if (kvs_set_ptr == NULL) + return EINVAL; + + msg.msg_type = PMI_KVS_PUT_REQ; + msg.data = (void *) kvs_set_ptr; + + /* Send the RPC to the local slurmd_step manager */ +/* FIXME, sending to slurmctld right now */ +/* The RPC has been verified to function properly */ +#if 0 + if (slurm_send_recv_controller_rc_msg(&msg, &rc) < 0) + return SLURM_FAILURE; + + if (rc) + slurm_seterrno_ret(rc); +#endif + return SLURM_SUCCESS; +} + +/* Wait for barrier and get full PMI Keyval space data */ +int slurm_get_kvs_comm_set(struct kvs_comm_set **kvs_set_ptr) +{ + slurm_msg_t req_msg; + slurm_msg_t resp_msg; + + if (kvs_set_ptr == NULL) + return EINVAL; + req_msg.msg_type = PMI_KVS_GET_REQ; + req_msg.data = NULL; + + /* Send the RPC to the local slurmd_step manager */ +/* FIXME, sending to slurmctld right now */ +#if 0 + if (slurm_send_recv_controller_msg(&req_msg, &resp_msg) < 0) + return SLURM_ERROR; + + slurm_free_cred(resp_msg.cred); + if (resp_msg.msg_type == PMI_KVS_GET_RESP) + *kvs_set_ptr = (struct kvs_comm_set *) resp_msg.data; + else + slurm_seterrno_ret(SLURM_UNEXPECTED_MSG_ERROR); +#else + *kvs_set_ptr = NULL; +#endif + return SLURM_SUCCESS; +} + +static void _free_kvs_comm(struct kvs_comm *kvs_comm_ptr) +{ + int i; + + for (i=0; i<kvs_comm_ptr->kvs_cnt; i++) { + xfree(kvs_comm_ptr->kvs_keys[i]); + xfree(kvs_comm_ptr->kvs_values[i]); + } + xfree(kvs_comm_ptr->kvs_name); + xfree(kvs_comm_ptr->kvs_keys); + xfree(kvs_comm_ptr->kvs_values); +} + +/* Free kvs_comm_set returned by slurm_get_kvs_comm_set() */ +void slurm_free_kvs_comm_set(struct kvs_comm_set *kvs_set_ptr) +{ + int i; + + for (i=0; i<kvs_set_ptr->kvs_comm_recs; i++) + _free_kvs_comm(kvs_set_ptr->kvs_comm_ptr[i]); + xfree(kvs_set_ptr->kvs_comm_ptr); + xfree(kvs_set_ptr); +} + diff --git a/src/api/slurm_pmi.h b/src/api/slurm_pmi.h new file mode 100644 index 0000000000000000000000000000000000000000..c084b77fa0fa48ed789b2c0c48b9921b54ab8540 --- /dev/null +++ b/src/api/slurm_pmi.h @@ -0,0 +1,70 @@ +/****************************************************************************\ + * slurm_pmi.h - definitions PMI support functions internal to SLURM + ***************************************************************************** + * Copyright (C) 2005 The Regents of the University of California. + * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). + * Written by Morris Jette <jette1@llnl.gov>. + * UCRL-CODE-2002-040. + * + * This file is part of SLURM, a resource management program. + * For details, see <http://www.llnl.gov/linux/slurm/>. + * + * SLURM is free software; you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the License, or (at your option) + * any later version. + * + * SLURM is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along + * with SLURM; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. +\*****************************************************************************/ + +#ifndef _SLURM_PMI_H +#define _SLURM_PMI_H + +#if HAVE_CONFIG_H +# include "config.h" +# if HAVE_INTTYPES_H +# include <inttypes.h> +# else +# if HAVE_STDINT_H +# include <stdint.h> +# endif +# endif /* HAVE_INTTYPES_H */ +#else /* !HAVE_CONFIG_H */ +# include <inttypes.h> +#endif /* HAVE_CONFIG_H */ + +#include "src/common/pack.h" + +#define PMI_MAX_KEY_LEN 256 /* Maximum size of a PMI key */ +#define PMI_MAX_KVSNAME_LEN 256 /* Maximum size of KVS name */ +#define PMI_MAX_VAL_LEN 256 /* Maximum size of a PMI value */ + +struct kvs_comm { + char * kvs_name; + uint16_t kvs_cnt; /* count of key-pairs */ + char ** kvs_keys; + char ** kvs_values; +}; +struct kvs_comm_set { + uint16_t task_id; /* job step's task id */ + uint16_t kvs_comm_recs; /* count of kvs_comm entries */ + struct kvs_comm **kvs_comm_ptr; /* pointers to kvs_comm entries */ +}; + +/* Transmit PMI Keyval space data */ +int slurm_send_kvs_comm_set(struct kvs_comm_set *kvs_set_ptr); + +/* Wait for barrier and get full PMI Keyval space data */ +int slurm_get_kvs_comm_set(struct kvs_comm_set **kvs_set_ptr); + +/* Free kvs_comm_set returned by slurm_get_kvs_comm_set() */ +void slurm_free_kvs_comm_set(struct kvs_comm_set *kvs_set_ptr); + +#endif diff --git a/src/common/slurm_protocol_defs.h b/src/common/slurm_protocol_defs.h index b0e27780d86726caaf8257787ffe5122b6b2ffa1..cc209f3bcb938034421f1392b4ef57338f449f62 100644 --- a/src/common/slurm_protocol_defs.h +++ b/src/common/slurm_protocol_defs.h @@ -148,6 +148,11 @@ typedef enum { SRUN_TIMEOUT, SRUN_NODE_FAIL, + PMI_KVS_PUT_REQ = 7201, + PMI_KVS_PUT_RESP, + PMI_KVS_GET_REQ, + PMI_KVS_GET_RESP, + RESPONSE_SLURM_RC = 8001, MESSAGE_UPLOAD_ACCOUNTING_INFO, MESSAGE_JOBACCT_DATA, diff --git a/src/common/slurm_protocol_pack.c b/src/common/slurm_protocol_pack.c index e3e6619036bb370a9223f168849c67c7b3896d88..52e13f53d318b5ef3d44b3114ea946ada927fee3 100644 --- a/src/common/slurm_protocol_pack.c +++ b/src/common/slurm_protocol_pack.c @@ -35,6 +35,7 @@ #include <stdlib.h> #include <string.h> +#include "src/api/slurm_pmi.h" #include "src/common/bitstring.h" #include "src/common/log.h" #include "src/common/node_select.h" @@ -261,6 +262,12 @@ static void _pack_jobacct_data(jobacct_msg_t * msg , Buf buffer ); static int _unpack_jobacct_data(jobacct_msg_t ** msg_ptr , Buf buffer ); +static void _dump_kvs_data(struct kvs_comm_set *msg); +static void _pack_kvs_rec(struct kvs_comm *msg_ptr, Buf buffer); +static int _unpack_kvs_rec(struct kvs_comm **msg_ptr, Buf buffer); +static void _pack_kvs_data(struct kvs_comm_set *msg_ptr, Buf buffer); +static int _unpack_kvs_data(struct kvs_comm_set **msg_ptr, Buf buffer); + /* pack_header * packs a slurm protocol header that proceeds every slurm message * IN header - the header structure to pack @@ -535,6 +542,13 @@ pack_msg(slurm_msg_t const *msg, Buf buffer) case MESSAGE_JOBACCT_DATA: _pack_jobacct_data((jobacct_msg_t *) msg->data, buffer); break; + case PMI_KVS_PUT_REQ: + case PMI_KVS_GET_RESP: + _pack_kvs_data((struct kvs_comm_set *) msg->data, buffer); + break; + case PMI_KVS_GET_REQ: + case PMI_KVS_PUT_RESP: + break; /* no data */ default: debug("No pack method for msg type %i", msg->msg_type); return EINVAL; @@ -806,6 +820,14 @@ unpack_msg(slurm_msg_t * msg, Buf buffer) rc = _unpack_jobacct_data( (jobacct_msg_t **) & msg->data, buffer); break; + case PMI_KVS_PUT_REQ: + case PMI_KVS_GET_RESP: + rc = _unpack_kvs_data((struct kvs_comm_set **) &msg->data, + buffer); + break; + case PMI_KVS_GET_REQ: + case PMI_KVS_PUT_RESP: + break; /* no data */ default: debug("No unpack method for msg type %i", msg->msg_type); return EINVAL; @@ -3263,6 +3285,98 @@ static int _unpack_jobacct_data(jobacct_msg_t ** msg_ptr , Buf buffer ) return SLURM_ERROR; } +static void _pack_kvs_rec(struct kvs_comm *msg_ptr, Buf buffer) +{ + int i; + xassert(msg_ptr != NULL); + + packstr(msg_ptr->kvs_name, buffer); + pack16(msg_ptr->kvs_cnt, buffer); + for (i=0; i<msg_ptr->kvs_cnt; i++) { + packstr(msg_ptr->kvs_keys[i], buffer); + packstr(msg_ptr->kvs_values[i], buffer); + } +} +static int _unpack_kvs_rec(struct kvs_comm **msg_ptr, Buf buffer) +{ + uint16_t uint16_tmp; + int i; + struct kvs_comm *msg; + + msg = xmalloc(sizeof(struct kvs_comm)); + *msg_ptr = msg; + safe_unpackstr_xmalloc(&msg->kvs_name, &uint16_tmp, buffer); + safe_unpack16(&msg->kvs_cnt, buffer); + msg->kvs_keys = xmalloc(sizeof(char *) * msg->kvs_cnt); + msg->kvs_values = xmalloc(sizeof(char *) * msg->kvs_cnt); + for (i=0; i<msg->kvs_cnt; i++) { + safe_unpackstr_xmalloc(&msg->kvs_keys[i], + &uint16_tmp, buffer); + safe_unpackstr_xmalloc(&msg->kvs_values[i], + &uint16_tmp, buffer); + } + return SLURM_SUCCESS; + +unpack_error: + return SLURM_ERROR; +} +static void _pack_kvs_data(struct kvs_comm_set *msg_ptr, Buf buffer) +{ + int i; + xassert(msg_ptr != NULL); + + pack16(msg_ptr->task_id, buffer); + pack16(msg_ptr->kvs_comm_recs, buffer); + for (i=0; i<msg_ptr->kvs_comm_recs; i++) + _pack_kvs_rec(msg_ptr->kvs_comm_ptr[i], buffer); +} +static void _dump_kvs_data(struct kvs_comm_set *msg) +{ + int i, j; + + info("KVS: task:%u, recs:%u", msg->task_id, msg->kvs_comm_recs); + for (i=0; i<msg->kvs_comm_recs; i++) { + info("KVS: name:%s cnt:%u", msg->kvs_comm_ptr[i]->kvs_name, + msg->kvs_comm_ptr[i]->kvs_cnt); + for (j=0; j<msg->kvs_comm_ptr[i]->kvs_cnt; j++) { + info("KVS: %s=%s", msg->kvs_comm_ptr[i]->kvs_keys[j], + msg->kvs_comm_ptr[i]->kvs_values[j]); + } + } +} + +static int _unpack_kvs_data(struct kvs_comm_set **msg_ptr, Buf buffer) +{ + struct kvs_comm_set *msg; + int i, j; + + msg = xmalloc(sizeof(struct kvs_comm_set)); + *msg_ptr = msg; + safe_unpack16(&msg->task_id, buffer); + safe_unpack16(&msg->kvs_comm_recs, buffer); + msg->kvs_comm_ptr = xmalloc(sizeof(struct kvs_comm) * + msg->kvs_comm_recs); + for (i=0; i<msg->kvs_comm_recs; i++) { + if (_unpack_kvs_rec(&msg->kvs_comm_ptr[i], buffer)) + goto unpack_error; + } + _dump_kvs_data(msg); + return SLURM_SUCCESS; + +unpack_error: + for (i=0; i<msg->kvs_comm_recs; i++) { + xfree(msg->kvs_comm_ptr[i]->kvs_name); + for (j=0; j<msg->kvs_comm_ptr[i]->kvs_cnt; j++) { + xfree(msg->kvs_comm_ptr[i]->kvs_keys[j]); + xfree(msg->kvs_comm_ptr[i]->kvs_values[j]); + } + } + xfree(msg->kvs_comm_ptr); + xfree(msg); + *msg_ptr = NULL; + return SLURM_ERROR; +} + /* template void pack_ ( * msg , Buf buffer ) {