diff --git a/NEWS b/NEWS index a815f3ac6b74c54f250e00abe26fd88e3eee1167..e7d6c1c4b82442e89961850f9a84f76ea82cb5ac 100644 --- a/NEWS +++ b/NEWS @@ -39,6 +39,9 @@ documents those changes that are of interest to users and admins. -- Fix for job request with GRES count of zero. -- Fix a potential memory leak in hostlist. -- Job array dependency logic: Cache results for major performance improvement. + -- Add a new environment variable PMI2_CONNECT_TO_SERVER. If set in the MPI application + environment the PMI2 library will connect the the PMI2 server (slurmstepd) instead + of using the provided PMI_FD socket. * Changes in Slurm 14.03.0pre6 ============================== diff --git a/contribs/pmi2/pmi2_api.c b/contribs/pmi2/pmi2_api.c index 9e7354a707194465965635c07480b078e350968c..fe16761c5702d70f31b8de781b1c053c82a6ef0b 100644 --- a/contribs/pmi2/pmi2_api.c +++ b/contribs/pmi2/pmi2_api.c @@ -41,6 +41,12 @@ static int PMI2_fd = -1; static int PMI2_size = 1; static int PMI2_rank = 0; +/* By default the pmi2 library connects back on a socket + * provided by the pmi2 server. However that socket can be closed + * for example by a shell running fork()/exec() in between. + * In that case the library must connect back to the pmi2 server. + */ +static int _connect_to_stepd(int); /* XXX DJG the "const"s on both of these functions and the Keyvalpair * struct are wrong in the isCopy==TRUE case! */ @@ -212,6 +218,10 @@ int PMI2_Init(int *spawned, int *size, int *rank, int *appnum) goto fn_exit; } + if (getenv("PMI2_CONNECT_TO_SERVER")) { + PMI2_fd = _connect_to_stepd(PMI2_fd); + } + /* do initial PMI1 init */ ret = snprintf(buf, PMI2_MAXLINE, "cmd=init pmi_version=%d pmi_subversion=%d\n", PMI_VERSION, PMI_SUBVERSION); PMI2U_ERR_CHKANDJUMP(ret < 0, pmi2_errno, PMI2_ERR_OTHER, "**intern %s", "failed to generate init line"); @@ -1934,3 +1944,67 @@ static void dump_PMI2_Command(PMI2_Command *cmd) for (i = 0; i < cmd->nPairs; ++i) dump_PMI2_Keyvalpair(cmd->pairs[i]); } + +/* _connect_to_stepd() + * + * If the user requests PMI2_CONNECT_TO_SERVER do + * connect over the PMI2_SUN_PATH unix socket. + */ +static int +_connect_to_stepd(int s) +{ + struct sockaddr_un addr; + int cc; + char *usock; + char *p; + int myrank; + int n; + + usock = getenv("PMI2_SUN_PATH"); + if (usock == NULL) + return -1; + + cc = socket(PF_UNIX, SOCK_STREAM, 0); + if (cc < 0) { + perror("socket()"); + return -1; + } + + memset(&addr, 0, sizeof(struct sockaddr_un)); + + addr.sun_family = AF_UNIX; + sprintf(addr.sun_path, usock); + + if (connect(cc, (struct sockaddr *)&addr, + sizeof(struct sockaddr_un)) != 0) { + perror("connect()"); + close(cc); + return -1; + } + + /* The very first thing we have to tell the pmi + * server is our rank, so he can associate our + * file descriptor with our rank. + */ + p = getenv("PMI_RANK"); + if (p == NULL) { + fprintf(stderr, "%s: failed to get PMI_RANK from env\n", __func__); + close(cc); + return -1; + } + + myrank = atoi(p); + n = write(cc, &myrank, sizeof(int)); + if (n != sizeof(int)) { + perror("write()"); + close(cc); + return -1; + } + + /* close() all socket and return + * the new. + */ + close(s); + + return cc; +} diff --git a/src/plugins/mpi/pmi2/agent.c b/src/plugins/mpi/pmi2/agent.c index 1cf7278f3a3646cb72e60fb816755712fc2a7e8b..01ec20e65b54e3c111e415219b99641eb909666c 100644 --- a/src/plugins/mpi/pmi2/agent.c +++ b/src/plugins/mpi/pmi2/agent.c @@ -65,7 +65,7 @@ static int *initialized = NULL; static int *finalized = NULL; - +static eio_handle_t *pmi2_handle; static pthread_t pmi2_agent_tid = 0; static bool _tree_listen_readable(eio_obj_t *obj); @@ -86,6 +86,7 @@ handle_read: &_task_read, static int _handle_pmi1_init(int fd, int lrank); +static int _handle_accept_rank(int); /*********************************************************************/ @@ -164,7 +165,6 @@ _tree_listen_read(eio_obj_t *obj, List objs) struct sockaddr addr; struct sockaddr_in *sin; socklen_t size = sizeof(addr); - char buf[INET_ADDRSTRLEN]; debug2("mpi/pmi2: _tree_listen_read"); @@ -188,16 +188,16 @@ _tree_listen_read(eio_obj_t *obj, List objs) return 0; } - if (! in_stepd()) { - sin = (struct sockaddr_in *) &addr; - inet_ntop(AF_INET, &sin->sin_addr, buf, INET_ADDRSTRLEN); - debug3("mpi/pmi2: accepted tree connection: ip=%s sd=%d", - buf, sd); - } + sin = (struct sockaddr_in *)&addr; + debug2("%s: accepted tree connection: ip %s sd %d", + __func__, inet_ntoa(sin->sin_addr), sd); - /* read command from socket and handle it */ - _handle_tree_request(sd); - close(sd); + if (in_stepd()) { + _handle_accept_rank(sd); + } else { + _handle_tree_request(sd); + close(sd); + } } return 0; } @@ -235,6 +235,8 @@ _task_read(eio_obj_t *obj, List objs) lrank = (int)(long)(obj->arg); rc = _handle_task_request(obj->fd, lrank); + if (rc != SLURM_SUCCESS) + obj->shutdown = true; return rc; } @@ -297,7 +299,6 @@ send_response: static void * _agent(void * unused) { - eio_handle_t *pmi2_handle; eio_obj_t *tree_listen_obj, *task_obj; int i; @@ -370,3 +371,37 @@ task_finalize(int lrank) { finalized[lrank] = 1; } + + +/* _handle_accept_rank() + * + * Handle connect requests from ranks. This happens + * when the pmi2 library does not uses the PMI_FD + * socket but decides to connect to the server by itself. + */ +static int +_handle_accept_rank(int fd) +{ + eio_obj_t *obj; + int cc; + int myrank; + + debug2("%s: going to read() client rank", __func__); + /* called run accept() now wait for the + * client to send us his rank. + */ + cc = read(fd, &myrank, sizeof(int)); + if (cc != sizeof(uint32_t)) { + close(fd); + xfree(myrank); + return -1; + + } + + debug2("%s: got client rank %d on fd %d", __func__, myrank, fd); + + obj = eio_obj_create(fd, &task_ops, (void *)((long)myrank)); + eio_new_initial_obj(pmi2_handle, obj); + + return 0; +} diff --git a/src/plugins/mpi/pmi2/mpi_pmi2.c b/src/plugins/mpi/pmi2/mpi_pmi2.c index 400a32b991ce53376a69a7cd2604eed10e6c2655..931b9f07a2f441ab4de27cfdacac637b63e5dde1 100644 --- a/src/plugins/mpi/pmi2/mpi_pmi2.c +++ b/src/plugins/mpi/pmi2/mpi_pmi2.c @@ -123,6 +123,12 @@ int p_mpi_hook_slurmstepd_task (const mpi_plugin_task_info_t *job, if (job_info.spawn_seq) { /* PMI1.1 needs this env-var */ env_array_overwrite_fmt(env, "PMI_SPAWNED", "%u", 1); } + + /* Set the path to the stepd socket for the pmi + * library to connect to. + */ + env_array_overwrite_fmt(env, "PMI2_SUN_PATH", sun_path); + /* close unused sockets in task */ close(tree_sock); tree_sock = 0; diff --git a/src/plugins/mpi/pmi2/setup.c b/src/plugins/mpi/pmi2/setup.c index fb6565c05cb4467e8187b20df94c9c3991bfad49..4e4a56c694d804894eb0e0567500dd0e0de3d064 100644 --- a/src/plugins/mpi/pmi2/setup.c +++ b/src/plugins/mpi/pmi2/setup.c @@ -47,6 +47,7 @@ #include <signal.h> #include <sys/types.h> #include <sys/un.h> +#include <sys/param.h> #include <poll.h> #include <unistd.h> #include <stdlib.h> @@ -79,6 +80,7 @@ int *task_socks; char tree_sock_addr[128]; pmi2_job_info_t job_info; pmi2_tree_info_t tree_info; +char sun_path[PATH_MAX]; extern bool in_stepd(void) @@ -275,6 +277,9 @@ _setup_stepd_sockets(const stepd_step_rec_t *job, char ***env) job->jobid, job->stepid); unlink(sa.sun_path); /* remove possible old socket */ + memset(sun_path, 0, sizeof(sun_path)); + strncpy(sun_path, sa.sun_path, sizeof(sun_path) - 1); + if (bind(tree_sock, (struct sockaddr *)&sa, SUN_LEN(&sa)) < 0) { error("mpi/pmi2: failed to bind tree socket: %m"); unlink(sa.sun_path); diff --git a/src/plugins/mpi/pmi2/setup.h b/src/plugins/mpi/pmi2/setup.h index fdc8d7ecf23071eca202d9d70fc54ccc8a3ce2e7..7cee74356fb980305173358702cf7b21411cb611 100644 --- a/src/plugins/mpi/pmi2/setup.h +++ b/src/plugins/mpi/pmi2/setup.h @@ -105,6 +105,7 @@ extern pmi2_tree_info_t tree_info; extern char tree_sock_addr[]; extern int tree_sock; extern int *task_socks; +extern char sun_path[]; #define STEPD_PMI_SOCK(lrank) task_socks[lrank * 2] #define TASK_PMI_SOCK(lrank) task_socks[lrank * 2 + 1]