diff --git a/NEWS b/NEWS index dcabbbd23a2ce0d7622fefc3c625bf5dd4feb35c..378d878fe6196ccc9be0a1ddf52b6557b8828575 100644 --- a/NEWS +++ b/NEWS @@ -61,6 +61,8 @@ documents those changes that are of interest to users and admins. -- In select/cons_res properly release resources allocated to job being suspended (rmbreak.patch, from Chris Holmes, HP). -- Fix AIX linking problem for PMI (mpich2) support. + -- Improve PMI logic for greater scalability (up to 16k tasks run). + -- Add srun support for SLURM_THREADS and PMI_FANOUT environment variables. * Changes in SLURM 1.2.16 ========================= diff --git a/doc/man/man1/srun.1 b/doc/man/man1/srun.1 index 66d4d2b5a65ba8e1182f2f0a3b3a59edef290047..9414d617922ef6fb3682ddeab180325eb4a70379 100644 --- a/doc/man/man1/srun.1 +++ b/doc/man/man1/srun.1 @@ -1059,12 +1059,24 @@ These environment variables, along with their corresponding options, are listed below. Note: Command line options will always override these settings. .TP 22 +\fBPMI_FANOUT\fR +This is used exclusively with PMI (MPICH2 and MVAPICH2) and +controls the fanout of data communications. The srun command +sends messages to application programs (via the PMI library) +and those applications may be called upon to forward that +data to up to this number of additional tasks. Higher values +offload work from the srun command to the applications and +likely increase the vulernability to failures. +The default value is 32. +.TP \fBPMI_TIME\fR This is used exclusively with PMI (MPICH2 and MVAPICH2) and -controls how the much the communications from the tasks to the +controls how much the communications from the tasks to the srun are spread out in time in order to avoid overwhelming the srun command with work. The default value is 500 (microseconds) -per task. On relatively slow processors, higher values may be required. +per task. On relatively slow processors or systems with very +large processsor counts (and large PMI data sets), higher values +may be required. .TP \fBSLURM_CONF\fR The location of the SLURM configuration file. @@ -1174,11 +1186,14 @@ Same as \fB\-\-task\-epilog\fR=\fIexecutable\fR \fBSLURM_TASK_PROLOG\fR Same as \fB\-\-task\-prolog\fR=\fIexecutable\fR .TP +\fBSLURM_THREADS\fR +Same as \fB\-T, \-\-threads\fR +.TP \fBSLURM_TIMELIMIT\fR Same as \fB\-t, \-\-time\fR=\fIminutes\fR .TP \fBSLURM_UNBUFFEREDIO\fR -Same as \fB-u, --unbuffered\fR +Same as \fB\-u, \-\-unbuffered\fR .TP \fBSLURM_WAIT\fR Same as \fB\-W, \-\-wait\fR=\fIseconds\fR diff --git a/src/api/pmi_server.c b/src/api/pmi_server.c index 94c4f9a7880ec662603ddb127156d7bfb5caeea3..698e0a166f40ec92f5282d134cb14bd5ddfa52c3 100644 --- a/src/api/pmi_server.c +++ b/src/api/pmi_server.c @@ -30,6 +30,7 @@ #endif #include <pthread.h> +#include <stdlib.h> #include <slurm/slurm_errno.h> #include "src/api/slurm_pmi.h" @@ -42,7 +43,7 @@ #include "src/common/xmalloc.h" #define _DEBUG 0 /* non-zero for extra KVS logging */ -#define PMI_FANOUT 32 /* max fanout for PMI msg forwarding */ +#define _DEBUG_TIMING 0 /* non-zero for KVS timing details */ static pthread_mutex_t kvs_mutex = PTHREAD_MUTEX_INITIALIZER; static int kvs_comm_cnt = 0; @@ -179,12 +180,20 @@ static void *_agent(void *x) struct kvs_comm_set *kvs_set; struct msg_arg *msg_args; struct kvs_hosts *kvs_host_list; - int i, j, kvs_set_cnt = 0, host_cnt; + int i, j, kvs_set_cnt = 0, host_cnt, pmi_fanout = 32; int msg_sent = 0, max_forward = 0; + char *tmp; pthread_t msg_id; pthread_attr_t attr; DEF_TIMERS; + tmp = getenv("PMI_FANOUT"); + if (tmp) { + pmi_fanout = atoi(tmp); + if (pmi_fanout < 1) + pmi_fanout = 32; + } + /* only send one message to each host, * build table of the ports on each host */ START_TIMER; @@ -194,9 +203,9 @@ static void *_agent(void *x) for (i=0; i<args->barrier_xmit_cnt; i++) { if (args->barrier_xmit_ptr[i].port == 0) continue; /* already sent message to host */ - kvs_host_list = xmalloc(sizeof(struct kvs_hosts) * PMI_FANOUT); + kvs_host_list = xmalloc(sizeof(struct kvs_hosts) * pmi_fanout); host_cnt = 0; -#if PMI_FANOUT + /* This code enables key-pair forwarding between * tasks. First task on the node gets the key-pairs * with host/port information for all other tasks on @@ -214,10 +223,10 @@ static void *_agent(void *x) args->barrier_xmit_ptr[j].hostname; args->barrier_xmit_ptr[j].port = 0;/* don't reissue */ host_cnt++; - if (host_cnt >= PMI_FANOUT) + if (host_cnt >= pmi_fanout) break; } -#endif + msg_sent++; max_forward = MAX(host_cnt, max_forward); @@ -413,6 +422,11 @@ extern int pmi_kvs_put(struct kvs_comm_set *kvs_set_ptr) extern int pmi_kvs_get(kvs_get_msg_t *kvs_get_ptr) { int rc = SLURM_SUCCESS; +#if _DEBUG_TIMING + static uint32_t tm[10000]; + int cur_time, i; + struct timeval tv; +#endif #if _DEBUG info("pmi_kvs_get: rank:%u size:%u port:%u, host:%s", @@ -423,7 +437,12 @@ extern int pmi_kvs_get(kvs_get_msg_t *kvs_get_ptr) error("PMK_KVS_Barrier reached with size == 0"); return SLURM_ERROR; } - +#if _DEBUG_TIMING + gettimeofday(&tv, NULL); + cur_time = (tv.tv_sec % 1000) + tv.tv_usec; + if (kvs_get_ptr->task_id < 10000) + tm[kvs_get_ptr->task_id] = cur_time; +#endif pthread_mutex_lock(&kvs_mutex); if (barrier_cnt == 0) { barrier_cnt = kvs_get_ptr->size; @@ -448,9 +467,18 @@ extern int pmi_kvs_get(kvs_get_msg_t *kvs_get_ptr) barrier_ptr[kvs_get_ptr->task_id].port = kvs_get_ptr->port; barrier_ptr[kvs_get_ptr->task_id].hostname = kvs_get_ptr->hostname; kvs_get_ptr->hostname = NULL; /* just moved the pointer */ - if (barrier_resp_cnt == barrier_cnt) + if (barrier_resp_cnt == barrier_cnt) { +#if _DEBUG_TIMING + info("task[%d] at %u", 0, tm[0]); + for (i=1; ((i<barrier_cnt) && (i<10000)); i++) { + cur_time = (int) tm[i] - (int) tm[i-1]; + info("task[%d] at %u diff %d", i, tm[i], cur_time); + } +#endif _kvs_xmit_tasks(); +} fini: pthread_mutex_unlock(&kvs_mutex); + return rc; } diff --git a/src/api/slurm_pmi.c b/src/api/slurm_pmi.c index 70959b24255cdab70374e955f54d88f155b8b808..f1c2454e455396d730b9c936cffeb16dda1ad2b9 100644 --- a/src/api/slurm_pmi.c +++ b/src/api/slurm_pmi.c @@ -36,6 +36,7 @@ \*****************************************************************************/ #include <stdlib.h> +#include <sys/time.h> #include <slurm/slurm.h> #include <slurm/slurm_errno.h> @@ -48,17 +49,77 @@ #include "src/common/fd.h" #include "src/common/slurm_auth.h" -#define MAX_RETRIES 5 +#define DEFAULT_PMI_TIME 500 +#define MAX_RETRIES 5 int pmi_fd = -1; int pmi_time = 0; uint16_t srun_port = 0; slurm_addr srun_addr; -static int _forward_comm_set(struct kvs_comm_set *kvs_set_ptr); -static int _get_addr(void); +static void _delay_rpc(int pmi_rank, int pmi_size); +static int _forward_comm_set(struct kvs_comm_set *kvs_set_ptr); +static int _get_addr(void); static void _set_pmi_time(void); +/* Delay an RPC to srun in order to avoid overwhelming the srun command. + * The delay is based upon the number of tasks, this task's rank, and PMI_TIME. + * This logic depends upon synchronized clocks across the cluster. */ +static void _delay_rpc(int pmi_rank, int pmi_size) +{ + struct timeval tv1, tv2; + uint32_t cur_time; /* current time in usec (just 9 digits) */ + uint32_t tot_time; /* total time expected for all RPCs */ + uint32_t offset_time; /* relative time within tot_time */ + uint32_t target_time; /* desired time to issue the RPC */ + uint32_t delta_time, error_time; + int retries = 0; + + _set_pmi_time(); + +again: if (gettimeofday(&tv1, NULL)) { + usleep(pmi_rank * pmi_time); + return; + } + + cur_time = (tv1.tv_sec % 1000) + tv1.tv_usec; + tot_time = pmi_size * pmi_time; + offset_time = cur_time % tot_time; + target_time = pmi_rank * pmi_time; + if (target_time < offset_time) + delta_time = target_time - offset_time + tot_time; + else + delta_time = target_time - offset_time; + if (usleep(delta_time)) { + if (errno == EINVAL) + usleep(900000); + /* errno == EINTR */ + goto again; + } + + /* Verify we are active at the right time. If current time is different + * from target by more than 15*pmi_time, then start over. If PMI_TIME + * is set appropriately, then srun should have no more than 30 RPCs + * in the queue at one time in the worst case. */ + if (gettimeofday(&tv2, NULL)) + return; + tot_time = (tv2.tv_sec - tv1.tv_sec) * 1000000; + tot_time += tv2.tv_usec; + tot_time -= tv1.tv_usec; + if (tot_time >= delta_time) + error_time = tot_time - delta_time; + else + error_time = delta_time - tot_time; + if (error_time > (15*pmi_time)) { /* too far off */ +#if 0 + info("delta=%u tot=%u err=%u", + delta_time, tot_time, error_time); +#endif + if ((++retries) <= 2) + goto again; + } +} + static int _get_addr(void) { char *env_host, *env_port; @@ -85,14 +146,14 @@ static void _set_pmi_time(void) tmp = getenv("PMI_TIME"); if (tmp == NULL) { - pmi_time = 500; + pmi_time = DEFAULT_PMI_TIME; return; } pmi_time = strtol(tmp, &endptr, 10); if ((pmi_time < 0) || (endptr[0] != '\0')) { error("Invalid PMI_TIME: %s", tmp); - pmi_time = 500; + pmi_time = DEFAULT_PMI_TIME; } } @@ -123,9 +184,11 @@ int slurm_send_kvs_comm_set(struct kvs_comm_set *kvs_set_ptr, * command is very overloaded. * We also increase the timeout (default timeout is * 10 secs). */ - usleep(pmi_rank * pmi_time); - if (pmi_size > 1000) /* 100 secs */ - timeout = slurm_get_msg_timeout() * 10000; + _delay_rpc(pmi_rank, pmi_size); + if (pmi_size > 4000) /* 240 secs */ + timeout = slurm_get_msg_timeout() * 24000; + else if (pmi_size > 1000) /* 120 secs */ + timeout = slurm_get_msg_timeout() * 12000; else if (pmi_size > 100) /* 50 secs */ timeout = slurm_get_msg_timeout() * 5000; else if (pmi_size > 10) /* 20 secs */ @@ -135,8 +198,9 @@ int slurm_send_kvs_comm_set(struct kvs_comm_set *kvs_set_ptr, if (retries++ > MAX_RETRIES) { error("slurm_send_kvs_comm_set: %m"); return SLURM_ERROR; - } - usleep(pmi_rank * pmi_time); + } else + debug("send_kvs retry %d", retries); + _delay_rpc(pmi_rank, pmi_size); } return rc; @@ -153,7 +217,7 @@ int slurm_get_kvs_comm_set(struct kvs_comm_set **kvs_set_ptr, uint16_t port; kvs_get_msg_t data; char *env_pmi_ifhn; - + if (kvs_set_ptr == NULL) return EINVAL; *kvs_set_ptr = NULL; /* initialization */ @@ -203,18 +267,14 @@ int slurm_get_kvs_comm_set(struct kvs_comm_set **kvs_set_ptr, * Also increase the message timeout if many tasks * since the srun command can get very overloaded (the * default timeout is 10 secs). - * - * TaskID SendTime GetTime (Units are PMI_TIME, default=500 usec) - * 0 0 N+0 - * 1 1 N+1 - * 2 2 N+2 - * N-1 N-1 N+N-1 */ - usleep(pmi_size * pmi_time); - if (pmi_size > 1000) /* 100 secs */ - timeout = slurm_get_msg_timeout() * 10000; - else if (pmi_size > 100) /* 50 secs */ - timeout = slurm_get_msg_timeout() * 5000; + _delay_rpc(pmi_rank, pmi_size); + if (pmi_size > 4000) /* 240 secs */ + timeout = slurm_get_msg_timeout() * 24000; + else if (pmi_size > 1000) /* 120 secs */ + timeout = slurm_get_msg_timeout() * 12000; + else if (pmi_size > 100) /* 60 secs */ + timeout = slurm_get_msg_timeout() * 6000; else if (pmi_size > 10) /* 20 secs */ timeout = slurm_get_msg_timeout() * 2000; @@ -222,8 +282,9 @@ int slurm_get_kvs_comm_set(struct kvs_comm_set **kvs_set_ptr, if (retries++ > MAX_RETRIES) { error("slurm_get_kvs_comm_set: %m"); return SLURM_ERROR; - } - usleep(pmi_rank * pmi_time); + } else + debug("get kvs retry %d", retries); + _delay_rpc(pmi_rank, pmi_size); } if (rc != SLURM_SUCCESS) { error("slurm_get_kvs_comm_set error_code=%d", rc); @@ -263,7 +324,8 @@ int slurm_get_kvs_comm_set(struct kvs_comm_set **kvs_set_ptr, } /* Forward keypair info to other tasks as required. -* Clear message forward structure upon completion. */ + * Clear message forward structure upon completion. + * The messages are forwarded sequentially. */ static int _forward_comm_set(struct kvs_comm_set *kvs_set_ptr) { int i, rc = SLURM_SUCCESS; diff --git a/src/common/slurm_protocol_api.c b/src/common/slurm_protocol_api.c index 22bdf439cbd58d784b7ecbda17769eea9c8ae831..f25b3380997a014e3e7117a0ebbc8fb5ac212898 100644 --- a/src/common/slurm_protocol_api.c +++ b/src/common/slurm_protocol_api.c @@ -986,10 +986,10 @@ int slurm_receive_msg(slurm_fd fd, slurm_msg_t *msg, int timeout) timeout = slurm_get_msg_timeout() * 1000; else if(timeout > (slurm_get_msg_timeout() * 10000)) { - error("You are sending a message with very long " + debug("You are receiving a message with very long " "timeout of %d seconds", (timeout/1000)); } else if(timeout < 1000) { - error("You are sending a message with a very short " + error("You are receiving a message with a very short " "timeout of %d msecs", timeout); } @@ -1132,7 +1132,7 @@ List slurm_receive_msgs(slurm_fd fd, int steps, int timeout) debug4("orig_timeout was %d we have %d steps and a timeout of %d", orig_timeout, steps, timeout); if(orig_timeout >= (slurm_get_msg_timeout() * 10000)) { - error("slurm_receive_msgs: " + debug("slurm_receive_msgs: " "You are sending a message with timeout's greater " "than %d seconds, your's is %d seconds", (slurm_get_msg_timeout() * 10), @@ -1293,7 +1293,7 @@ int slurm_receive_msg_and_forward(slurm_fd fd, slurm_addr *orig_addr, timeout = slurm_get_msg_timeout() * 1000; if(timeout >= (slurm_get_msg_timeout() * 10000)) { - error("slurm_receive_msg_and_forward: " + debug("slurm_receive_msg_and_forward: " "You are sending a message with timeout's greater " "than %d seconds, your's is %d seconds", (slurm_get_msg_timeout() * 10), diff --git a/src/common/timers.c b/src/common/timers.c index 949382e7e3d7bb2569b22be4c9d64916707ecbb5..421cdae91824652024907fb9eef8a8c7100f56e6 100644 --- a/src/common/timers.c +++ b/src/common/timers.c @@ -54,7 +54,7 @@ inline void diff_tv_str(struct timeval *tv1,struct timeval *tv2, delta_t += tv2->tv_usec - tv1->tv_usec; snprintf(tv_str, len_tv_str, "usec=%ld", delta_t); if (delta_t > 1000000) - info("Warning: Note very large processing time: %s",tv_str); + verbose("Warning: Note very large processing time: %s",tv_str); } /* diff --git a/src/srun/opt.c b/src/srun/opt.c index e5541dce4a171a23432a6c49c626aefa3af8e939..992c3efe14e334663d6b71aed91f23644c5f9752 100644 --- a/src/srun/opt.c +++ b/src/srun/opt.c @@ -751,6 +751,7 @@ env_vars_t env_vars[] = { {"SLURM_STDERRMODE", OPT_STRING, &opt.efname, NULL }, {"SLURM_STDINMODE", OPT_STRING, &opt.ifname, NULL }, {"SLURM_STDOUTMODE", OPT_STRING, &opt.ofname, NULL }, +{"SLURM_THREADS", OPT_INT, &opt.max_threads, NULL }, {"SLURM_TIMELIMIT", OPT_STRING, &opt.time_limit_str,NULL }, {"SLURM_CHECKPOINT", OPT_STRING, &opt.ckpt_interval_str, NULL }, {"SLURM_WAIT", OPT_INT, &opt.max_wait, NULL }, diff --git a/testsuite/expect/regression b/testsuite/expect/regression index 9d3db8ac46652199c713d53767be6804ec1a5588..4a386b89cb3a7112a989fce243b294e6d6ab57c4 100755 --- a/testsuite/expect/regression +++ b/testsuite/expect/regression @@ -56,7 +56,7 @@ for major in `seq 1 20`; do TEST=test${major}.${minor} if [ ! -f $TEST ]; then continue; fi - expect $TEST + $TEST if [ $? -eq 0 ] then COMPLETIONS=$((COMPLETIONS+1))