diff --git a/src/api/Makefile.am b/src/api/Makefile.am index 41baf9761768e918c13d24a0081eb2b55fbfc45c..d2e4011d5db1b149e7c09b5f7a44e9b544a3a79a 100644 --- a/src/api/Makefile.am +++ b/src/api/Makefile.am @@ -65,7 +65,7 @@ slurmapi_src = \ step_ctx.c step_ctx.h \ step_io.c step_io.h \ step_launch.c \ - step_pmi.c step_pmi.h \ + pmi_server.c pmi_server.h \ submit.c \ suspend.c \ reconfigure.c \ diff --git a/src/api/Makefile.in b/src/api/Makefile.in index 7d4be77a41657897f6f22758ff6628754a13b1fd..2d7f794718686680258886527ed3672931b2ea39 100644 --- a/src/api/Makefile.in +++ b/src/api/Makefile.in @@ -88,7 +88,7 @@ am__objects_1 = allocate.lo cancel.lo checkpoint.lo complete.lo \ config_info.lo init_msg.lo job_info.lo job_step_info.lo \ node_info.lo node_select_info.lo partition_info.lo signal.lo \ slurm_pmi.lo spawn.lo step_ctx.lo step_io.lo step_launch.lo \ - step_pmi.lo submit.lo suspend.lo reconfigure.lo \ + pmi_server.lo submit.lo suspend.lo reconfigure.lo \ update_config.lo am_libslurm_la_OBJECTS = $(am__objects_1) libslurm_la_OBJECTS = $(am_libslurm_la_OBJECTS) @@ -351,7 +351,7 @@ slurmapi_src = \ step_ctx.c step_ctx.h \ step_io.c step_io.h \ step_launch.c \ - step_pmi.c step_pmi.h \ + pmi_server.c pmi_server.h \ submit.c \ suspend.c \ reconfigure.c \ @@ -484,6 +484,7 @@ distclean-compile: @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/node_select_info.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/partition_info.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pmi.Plo@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pmi_server.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/reconfigure.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/signal.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/slurm_pmi.Plo@am__quote@ @@ -491,7 +492,6 @@ distclean-compile: @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/step_ctx.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/step_io.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/step_launch.Plo@am__quote@ -@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/step_pmi.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/submit.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/suspend.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/update_config.Plo@am__quote@ diff --git a/src/api/step_pmi.c b/src/api/pmi_server.c similarity index 98% rename from src/api/step_pmi.c rename to src/api/pmi_server.c index 83ad5754c1e6f5e39f58fc1279c783a493cada0c..142823af5a417eaefed763e159777e0c31a22eb3 100644 --- a/src/api/step_pmi.c +++ b/src/api/pmi_server.c @@ -70,7 +70,7 @@ struct msg_arg { struct kvs_comm_set *kvs_ptr; }; int agent_cnt = 0; /* number of active message agents */ -int agent_max_cnt = 0; /* maximum number of active agents */ +int agent_max_cnt = 32; /* maximum number of active agents */ static void *_agent(void *x); static struct kvs_comm *_find_kvs_by_name(char *name); @@ -385,7 +385,10 @@ fini: pthread_mutex_unlock(&kvs_mutex); * The PMI server code is used interally by the slurm_step_launch() function * to support MPI libraries that bootstrap themselves using PMI. */ -extern void slurm_pmi_server_max_threads(int max_threads) +extern void pmi_server_max_threads(int max_threads) { - agent_max_cnt = max_threads; + if (max_threads <= 0) + error("pmi server max threads must be greater than zero"); + else + agent_max_cnt = max_threads; } diff --git a/src/api/step_pmi.h b/src/api/pmi_server.h similarity index 97% rename from src/api/step_pmi.h rename to src/api/pmi_server.h index fd71e811adafe9a786d1510c63d3275661df582e..c8da89ee8442e870baaf3e153fc43000b4ebb632 100644 --- a/src/api/step_pmi.h +++ b/src/api/pmi_server.h @@ -42,6 +42,6 @@ extern int pmi_kvs_get(kvs_get_msg_t *kvs_get_ptr); * The PMI server code is used interally by the slurm_step_launch() function * to support MPI libraries that bootstrap themselves using PMI. */ -extern void slurm_pmi_server_max_threads(int max_threads); +extern void pmi_server_max_threads(int max_threads); #endif diff --git a/src/api/step_launch.c b/src/api/step_launch.c index b6ada41e898a069b2dff3558a70205dafdf12033..88de63e9307815a4969fea6499846a183b2ce38d 100644 --- a/src/api/step_launch.c +++ b/src/api/step_launch.c @@ -60,7 +60,7 @@ #include "src/common/slurm_cred.h" #include "src/api/step_ctx.h" -#include "src/api/step_pmi.h" +#include "src/api/pmi_server.h" /********************************************************************** * General declarations for step launch code diff --git a/src/slaunch/opt.c b/src/slaunch/opt.c index 149dec0a7dcc68e6295087026193354057547e50..e3e746485bebf7b4249a69ad4612c9fd14cceffd 100644 --- a/src/slaunch/opt.c +++ b/src/slaunch/opt.c @@ -73,7 +73,7 @@ #include "src/common/read_config.h" /* getnodename() */ #include "src/common/hostlist.h" #include "src/common/mpi.h" -#include "src/api/step_pmi.h" +#include "src/api/pmi_server.h" #include "src/slaunch/attach.h" @@ -1379,7 +1379,7 @@ void set_options(const int argc, char **argv) if (max <= 0) error("--pmi-threads must be a positive integer"); else - slurm_pmi_server_max_threads(max); + pmi_server_max_threads(max); } break; default: diff --git a/src/srun/Makefile.am b/src/srun/Makefile.am index b9965bda0a5a2fdf610fdd33ea06d1b0e9e7e717..b055043f4f013761348ddbb29a4601b362c6919b 100644 --- a/src/srun/Makefile.am +++ b/src/srun/Makefile.am @@ -11,7 +11,6 @@ srun_SOURCES = \ opt.c opt.h \ srun_job.c srun_job.h \ msg.c msg.h \ - pmi.c pmi.h \ signals.c signals.h \ launch.c \ launch.h \ diff --git a/src/srun/Makefile.in b/src/srun/Makefile.in index 2fb72d2b2b236654ef2ff47d6b426d5d3b811670..4ed3b16c0f093e3d5af352ab5957847e210e3c9b 100644 --- a/src/srun/Makefile.in +++ b/src/srun/Makefile.in @@ -73,7 +73,7 @@ am__installdirs = "$(DESTDIR)$(bindir)" binPROGRAMS_INSTALL = $(INSTALL_PROGRAM) PROGRAMS = $(bin_PROGRAMS) am_srun_OBJECTS = srun.$(OBJEXT) opt.$(OBJEXT) srun_job.$(OBJEXT) \ - msg.$(OBJEXT) pmi.$(OBJEXT) signals.$(OBJEXT) launch.$(OBJEXT) \ + msg.$(OBJEXT) signals.$(OBJEXT) launch.$(OBJEXT) \ attach.$(OBJEXT) reattach.$(OBJEXT) fname.$(OBJEXT) \ sigstr.$(OBJEXT) allocate.$(OBJEXT) core-format.$(OBJEXT) \ multi_prog.$(OBJEXT) srun.wrapper.$(OBJEXT) @@ -281,7 +281,6 @@ srun_SOURCES = \ opt.c opt.h \ srun_job.c srun_job.h \ msg.c msg.h \ - pmi.c pmi.h \ signals.c signals.h \ launch.c \ launch.h \ @@ -384,7 +383,6 @@ distclean-compile: @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/msg.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/multi_prog.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/opt.Po@am__quote@ -@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pmi.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/reattach.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/signals.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/sigstr.Po@am__quote@ diff --git a/src/srun/msg.c b/src/srun/msg.c index d45012df415bd850754b2aea213ac89df770de7b..f22af6da0c6e4e380fa4151fb08da80932b6df20 100644 --- a/src/srun/msg.c +++ b/src/srun/msg.c @@ -57,11 +57,11 @@ #include "src/common/mpi.h" #include "src/common/forward.h" #include "src/common/global_srun.h" +#include "src/api/pmi_server.h" #include "src/srun/srun_job.h" #include "src/srun/opt.h" #include "src/srun/msg.h" -#include "src/srun/pmi.h" #include "src/srun/sigstr.h" #include "src/srun/attach.h" #include "src/srun/allocate.h" diff --git a/src/srun/opt.c b/src/srun/opt.c index 3e99cd2628e094618f4266ef5beec11e54f0ae83..68e09861e2ee7cc2985dee26a93c98b702a20065 100644 --- a/src/srun/opt.c +++ b/src/srun/opt.c @@ -696,6 +696,7 @@ static void _opt_default() opt.time_limit = -1; opt.partition = NULL; opt.max_threads = MAX_THREADS; + pmi_server_max_threads(opt.max_threads); opt.relative = NO_VAL; opt.relative_set = false; @@ -780,6 +781,7 @@ static void _opt_default() if ((opt.parallel_debug = _under_parallel_debugger())) { opt.max_launch_time = 120; opt.max_threads = 1; + pmi_server_max_threads(opt.max_threads); opt.msg_timeout = 15; } @@ -1304,6 +1306,7 @@ void set_options(const int argc, char **argv, int first) opt.max_threads = _get_int(optarg, "max_threads"); + pmi_server_max_threads(opt.max_threads); break; case (int)'u': opt.unbuffered = true; @@ -1441,6 +1444,7 @@ void set_options(const int argc, char **argv, int first) MPIR_being_debugged = 1; opt.max_launch_time = 120; opt.max_threads = 1; + pmi_server_max_threads(opt.max_threads); opt.msg_timeout = 15; break; case LONG_OPT_HELP: @@ -1748,6 +1752,7 @@ static bool _opt_verify(void) if (opt.max_threads <= 0) { /* set default */ error("Thread value invalid, reset to 1"); opt.max_threads = 1; + pmi_server_max_threads(opt.max_threads); } else if (opt.max_threads > MAX_THREADS) { error("Thread value exceeds defined limit, reset to %d", MAX_THREADS); diff --git a/src/srun/pmi.c b/src/srun/pmi.c deleted file mode 100644 index bf40d48b71bda09d60ff18bde6fbe73a0fdc7f33..0000000000000000000000000000000000000000 --- a/src/srun/pmi.c +++ /dev/null @@ -1,382 +0,0 @@ -/*****************************************************************************\ - * pmi.c - Global PMI data as maintained within srun - * $Id$ - ***************************************************************************** - * Copyright (C) 2005-2006 The Regents of the University of California. - * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). - * Written by Morris Jette <jette1@llnl.gov> - * UCRL-CODE-217948. - * - * This file is part of SLURM, a resource management program. - * For details, see <http://www.llnl.gov/linux/slurm/>. - * - * SLURM is free software; you can redistribute it and/or modify it under - * the terms of the GNU General Public License as published by the Free - * Software Foundation; either version 2 of the License, or (at your option) - * any later version. - * - * SLURM is distributed in the hope that it will be useful, but WITHOUT ANY - * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS - * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more - * details. - * - * You should have received a copy of the GNU General Public License along - * with SLURM; if not, write to the Free Software Foundation, Inc., - * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. -\*****************************************************************************/ - -#if HAVE_CONFIG_H -# include "config.h" -#endif - -#include <pthread.h> -#include <slurm/slurm_errno.h> - -#include "src/api/slurm_pmi.h" -#include "src/srun/opt.h" -#include "src/common/macros.h" -#include "src/common/slurm_protocol_api.h" -#include "src/common/slurm_protocol_defs.h" -#include "src/common/xsignal.h" -#include "src/common/xstring.h" -#include "src/common/xmalloc.h" - -#define _DEBUG 0 /* non-zero for extra KVS logging */ -#define MSG_TRANSMITS 2 /* transmit KVS messages this number times */ - -/* Global variables */ -pthread_mutex_t kvs_mutex = PTHREAD_MUTEX_INITIALIZER; -int kvs_comm_cnt = 0; -int kvs_updated = 0; -struct kvs_comm **kvs_comm_ptr = NULL; - -struct barrier_resp { - uint16_t port; - char *hostname; -}; /* details for barrier task communcations */ -struct barrier_resp *barrier_ptr = NULL; -uint16_t barrier_resp_cnt = 0; /* tasks having reached barrier */ -uint16_t barrier_cnt = 0; /* tasks needing to reach barrier */ - -pthread_mutex_t agent_mutex = PTHREAD_MUTEX_INITIALIZER; -pthread_cond_t agent_cond = PTHREAD_COND_INITIALIZER; -struct agent_arg { - struct barrier_resp *barrier_xmit_ptr; - int barrier_xmit_cnt; - struct kvs_comm **kvs_xmit_ptr; - int kvs_xmit_cnt; -}; /* details for message agent manager */ -struct msg_arg { - struct barrier_resp *bar_ptr; - struct kvs_comm_set *kvs_ptr; -}; -int agent_cnt = 0; /* number of active message agents */ - -static void *_agent(void *x); -static struct kvs_comm *_find_kvs_by_name(char *name); -struct kvs_comm **_kvs_comm_dup(void); -static void _kvs_xmit_tasks(void); -static void _merge_named_kvs(struct kvs_comm *kvs_orig, - struct kvs_comm *kvs_new); -static void _move_kvs(struct kvs_comm *kvs_new); -static void *_msg_thread(void *x); -static void _print_kvs(void); - -/* Transmit the KVS keypairs to all tasks, waiting at a barrier - * This will take some time, so we work with a copy of the KVS keypairs. - * We also work with a private copy of the barrier data and clear the - * global data pointers so any new barrier requests get treated as - * completely independent of this one. */ -static void _kvs_xmit_tasks(void) -{ - struct agent_arg *args; - pthread_attr_t attr; - pthread_t agent_id; - -#if _DEBUG - info("All tasks at barrier, transmit KVS keypairs now"); -#endif - /* reset barrier info */ - args = xmalloc(sizeof(struct agent_arg)); - args->barrier_xmit_ptr = barrier_ptr; - args->barrier_xmit_cnt = barrier_cnt; - barrier_ptr = NULL; - barrier_resp_cnt = 0; - barrier_cnt = 0; - - /* copy the new kvs data */ - if (kvs_updated) { - args->kvs_xmit_ptr = _kvs_comm_dup(); - args->kvs_xmit_cnt = kvs_comm_cnt; - kvs_updated = 0; - } else { /* No new data to transmit */ - args->kvs_xmit_ptr = xmalloc(0); - args->kvs_xmit_cnt = 0; - } - - /* Spawn a pthread to transmit it */ - slurm_attr_init(&attr); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); - if (pthread_create(&agent_id, &attr, _agent, (void *) args)) - fatal("pthread_create"); - slurm_attr_destroy(&attr); -} - -static void *_msg_thread(void *x) -{ - struct msg_arg *msg_arg_ptr = (struct msg_arg *) x; - int rc, success = 0, timeout; - slurm_msg_t msg_send; - - - debug2("KVS_Barrier msg to %s:%u", - msg_arg_ptr->bar_ptr->hostname, - msg_arg_ptr->bar_ptr->port); - msg_send.msg_type = PMI_KVS_GET_RESP; - msg_send.data = (void *) msg_arg_ptr->kvs_ptr; - slurm_set_addr(&msg_send.address, - msg_arg_ptr->bar_ptr->port, - msg_arg_ptr->bar_ptr->hostname); - - timeout = slurm_get_msg_timeout() * 8; - if (slurm_send_recv_rc_msg_only_one(&msg_send, &rc, timeout) < 0) { - error("slurm_send_recv_rc_msg_only_one: %m"); - } else if (rc != SLURM_SUCCESS) { - error("KVS_Barrier confirm from %s, rc=%d", - msg_arg_ptr->bar_ptr->hostname, rc); - } else { - /* successfully transmitted KVS keypairs */ - success = 1; - } - - slurm_mutex_lock(&agent_mutex); - agent_cnt--; - if (success) - msg_arg_ptr->bar_ptr->port = 0; - slurm_mutex_unlock(&agent_mutex); - pthread_cond_signal(&agent_cond); - xfree(x); - return NULL; -} - -static void *_agent(void *x) -{ - struct agent_arg *args = (struct agent_arg *) x; - struct kvs_comm_set kvs_set; - struct msg_arg *msg_args; - int i, j; - pthread_t msg_id; - pthread_attr_t attr; - - /* send the messages */ - slurm_attr_init(&attr); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); - kvs_set.kvs_comm_recs = args->kvs_xmit_cnt; - kvs_set.kvs_comm_ptr = args->kvs_xmit_ptr; - for (i=0; i<MSG_TRANSMITS; i++) { - for (j=0; j<args->barrier_xmit_cnt; j++) { - if (args->barrier_xmit_ptr[j].port == 0) - continue; - slurm_mutex_lock(&agent_mutex); - while (agent_cnt >= opt.max_threads) - pthread_cond_wait(&agent_cond, &agent_mutex); - agent_cnt++; - slurm_mutex_unlock(&agent_mutex); - - msg_args = xmalloc(sizeof(struct msg_arg)); - msg_args->bar_ptr = &args->barrier_xmit_ptr[j]; - msg_args->kvs_ptr = &kvs_set; - if (pthread_create(&msg_id, &attr, _msg_thread, - (void *) msg_args)) { - fatal("pthread_create: %m"); - } - } - slurm_mutex_lock(&agent_mutex); - while (agent_cnt > 0) - pthread_cond_wait(&agent_cond, &agent_mutex); - slurm_mutex_unlock(&agent_mutex); - } - slurm_attr_destroy(&attr); - - /* Release allocated memory */ - for (i=0; i<args->barrier_xmit_cnt; i++) - xfree(args->barrier_xmit_ptr[i].hostname); - xfree(args->barrier_xmit_ptr); - for (i=0; i<args->kvs_xmit_cnt; i++) { - for (j=0; j<args->kvs_xmit_ptr[i]->kvs_cnt; j++) { - xfree(args->kvs_xmit_ptr[i]->kvs_keys[j]); - xfree(args->kvs_xmit_ptr[i]->kvs_values[j]); - } - xfree(args->kvs_xmit_ptr[i]->kvs_keys); - xfree(args->kvs_xmit_ptr[i]->kvs_values); - xfree(args->kvs_xmit_ptr[i]->kvs_name); - xfree(args->kvs_xmit_ptr[i]); - } - xfree(args->kvs_xmit_ptr); - xfree(args); - return NULL; -} - -/* duplicate the current KVS comm structure */ -struct kvs_comm **_kvs_comm_dup(void) -{ - int i, j; - struct kvs_comm **rc_kvs; - - rc_kvs = xmalloc(sizeof(struct kvs_comm *) * kvs_comm_cnt); - for (i=0; i<kvs_comm_cnt; i++) { - rc_kvs[i] = xmalloc(sizeof(struct kvs_comm)); - rc_kvs[i]->kvs_name = xstrdup(kvs_comm_ptr[i]->kvs_name); - rc_kvs[i]->kvs_cnt = kvs_comm_ptr[i]->kvs_cnt; - rc_kvs[i]->kvs_keys = - xmalloc(sizeof(char *) * rc_kvs[i]->kvs_cnt); - rc_kvs[i]->kvs_values = - xmalloc(sizeof(char *) * rc_kvs[i]->kvs_cnt); - for (j=0; j<rc_kvs[i]->kvs_cnt; j++) { - rc_kvs[i]->kvs_keys[j] = - xstrdup(kvs_comm_ptr[i]->kvs_keys[j]); - rc_kvs[i]->kvs_values[j] = - xstrdup(kvs_comm_ptr[i]->kvs_values[j]); - } - } - return rc_kvs; -} - -/* return pointer to named kvs element or NULL if not found */ -static struct kvs_comm *_find_kvs_by_name(char *name) -{ - int i; - - for (i=0; i<kvs_comm_cnt; i++) { - if (strcmp(kvs_comm_ptr[i]->kvs_name, name)) - continue; - return kvs_comm_ptr[i]; - } - return NULL; -} - -static void _merge_named_kvs(struct kvs_comm *kvs_orig, - struct kvs_comm *kvs_new) -{ - int i, j; - - for (i=0; i<kvs_new->kvs_cnt; i++) { - for (j=0; j<kvs_orig->kvs_cnt; j++) { - if (strcmp(kvs_new->kvs_keys[i], kvs_orig->kvs_keys[j])) - continue; - xfree(kvs_orig->kvs_values[j]); - kvs_orig->kvs_values[j] = kvs_new->kvs_values[i]; - kvs_new->kvs_values[i] = NULL; - break; - } - if (j < kvs_orig->kvs_cnt) - continue; /* already recorded, update */ - /* append it */ - kvs_orig->kvs_cnt++; - xrealloc(kvs_orig->kvs_keys, - (sizeof(char *) * kvs_orig->kvs_cnt)); - xrealloc(kvs_orig->kvs_values, - (sizeof(char *) * kvs_orig->kvs_cnt)); - kvs_orig->kvs_keys[kvs_orig->kvs_cnt-1] = kvs_new->kvs_keys[i]; - kvs_orig->kvs_values[kvs_orig->kvs_cnt-1] = - kvs_new->kvs_values[i]; - kvs_new->kvs_keys[i] = NULL; - kvs_new->kvs_values[i] = NULL; - } -} - -static void _move_kvs(struct kvs_comm *kvs_new) -{ - kvs_comm_ptr = xrealloc(kvs_comm_ptr, (sizeof(struct kvs_comm *) * - (kvs_comm_cnt + 1))); - kvs_comm_ptr[kvs_comm_cnt] = kvs_new; - kvs_comm_cnt++; -} - -static void _print_kvs(void) -{ -#if _DEBUG - int i, j; - - info("KVS dump start"); - for (i=0; i<kvs_comm_cnt; i++) { - for (j=0; j<kvs_comm_ptr[i]->kvs_cnt; j++) { - info("KVS: %s:%s:%s", kvs_comm_ptr[i]->kvs_name, - kvs_comm_ptr[i]->kvs_keys[j], - kvs_comm_ptr[i]->kvs_values[j]); - } - } -#endif -} - -extern int pmi_kvs_put(struct kvs_comm_set *kvs_set_ptr) -{ - int i; - struct kvs_comm *kvs_ptr; - - /* Merge new data with old. - * NOTE: We just move pointers rather than copy data where - * possible for improved performance */ - pthread_mutex_lock(&kvs_mutex); - for (i=0; i<kvs_set_ptr->kvs_comm_recs; i++) { - kvs_ptr = _find_kvs_by_name(kvs_set_ptr-> - kvs_comm_ptr[i]->kvs_name); - if (kvs_ptr) { - _merge_named_kvs(kvs_ptr, - kvs_set_ptr->kvs_comm_ptr[i]); - } else { - _move_kvs(kvs_set_ptr->kvs_comm_ptr[i]); - kvs_set_ptr-> kvs_comm_ptr[i] = NULL; - } - } - slurm_free_kvs_comm_set(kvs_set_ptr); - _print_kvs(); - kvs_updated = 1; - pthread_mutex_unlock(&kvs_mutex); - return SLURM_SUCCESS; -} - -extern int pmi_kvs_get(kvs_get_msg_t *kvs_get_ptr) -{ - int rc = SLURM_SUCCESS; - -#if _DEBUG - info("pmi_kvs_get: rank:%u size:%u port:%u, host:%s", - kvs_get_ptr->task_id, kvs_get_ptr->size, - kvs_get_ptr->port, kvs_get_ptr->hostname); -#endif - if (kvs_get_ptr->size == 0) { - error("PMK_KVS_Barrier reached with size == 0"); - return SLURM_ERROR; - } - - pthread_mutex_lock(&kvs_mutex); - if (barrier_cnt == 0) { - barrier_cnt = kvs_get_ptr->size; - barrier_ptr = xmalloc(sizeof(struct barrier_resp)*barrier_cnt); - } else if (barrier_cnt != kvs_get_ptr->size) { - error("PMK_KVS_Barrier task count inconsistent (%u != %u)", - barrier_cnt, kvs_get_ptr->size); - rc = SLURM_ERROR; - goto fini; - } - if (kvs_get_ptr->task_id >= barrier_cnt) { - error("PMK_KVS_Barrier task count(%u) >= size(%u)", - kvs_get_ptr->task_id, barrier_cnt); - rc = SLURM_ERROR; - goto fini; - } - if (barrier_ptr[kvs_get_ptr->task_id].port == 0) - barrier_resp_cnt++; - else - error("PMK_KVS_Barrier duplicate request from task %u", - kvs_get_ptr->task_id); - barrier_ptr[kvs_get_ptr->task_id].port = kvs_get_ptr->port; - barrier_ptr[kvs_get_ptr->task_id].hostname = kvs_get_ptr->hostname; - kvs_get_ptr->hostname = NULL; /* just moved the pointer */ - if (barrier_resp_cnt == barrier_cnt) - _kvs_xmit_tasks(); -fini: pthread_mutex_unlock(&kvs_mutex); - return rc; -} - diff --git a/src/srun/pmi.h b/src/srun/pmi.h deleted file mode 100644 index 93b9eeb534fd8f7511b90f96c4d55966d5c93768..0000000000000000000000000000000000000000 --- a/src/srun/pmi.h +++ /dev/null @@ -1,40 +0,0 @@ -/*****************************************************************************\ - * pmi.h - Global PMI data as maintained within srun - * $Id$ - ***************************************************************************** - * Copyright (C) 2005 The Regents of the University of California. - * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). - * Written by Morris Jette <jette1@llnl.gov> - * UCRL-CODE-217948. - * - * This file is part of SLURM, a resource management program. - * For details, see <http://www.llnl.gov/linux/slurm/>. - * - * SLURM is free software; you can redistribute it and/or modify it under - * the terms of the GNU General Public License as published by the Free - * Software Foundation; either version 2 of the License, or (at your option) - * any later version. - * - * SLURM is distributed in the hope that it will be useful, but WITHOUT ANY - * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS - * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more - * details. - * - * You should have received a copy of the GNU General Public License along - * with SLURM; if not, write to the Free Software Foundation, Inc., - * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. -\*****************************************************************************/ - -#ifndef _SRUN_PMI_H -#define _SRUN_PMI_H - -#include "src/api/slurm_pmi.h" - -/* Put the supplied kvs values into the common store */ -extern int pmi_kvs_put(struct kvs_comm_set *kvs_set_ptr); - -/* Note that a task has reached a barrier, - * transmit the kvs values to the task */ -extern int pmi_kvs_get(kvs_get_msg_t *kvs_get_ptr); - -#endif