diff --git a/slurm/slurm.h.in b/slurm/slurm.h.in index a3c3f801a6a0ca091c31e8d422c4ff743635c5fe..c322b8cd760600c1e1ccbeec35c054c216f1eec8 100644 --- a/slurm/slurm.h.in +++ b/slurm/slurm.h.in @@ -724,6 +724,8 @@ typedef struct { uint16_t ntasks_per_core; uint16_t task_dist; uint16_t plane_size; + + char *mpi_plugin_name; } slurm_step_launch_params_t; typedef struct { diff --git a/slurm/slurm_errno.h b/slurm/slurm_errno.h index 2ae37f877b4614ee545ddcb04338de2c6a0c9345..6143acfde7d0b9a3cc6b8f8ffc68fc194b718bf0 100644 --- a/slurm/slurm_errno.h +++ b/slurm/slurm_errno.h @@ -94,6 +94,8 @@ enum { SLURM_PROTOCOL_IO_STREAM_VERSION_ERROR, SLURM_PROTOCOL_AUTHENTICATION_ERROR, SLURM_PROTOCOL_INSANE_MSG_LENGTH, + SLURM_MPI_PLUGIN_NAME_INVALID, + SLURM_MPI_PLUGIN_PRELAUNCH_SETUP_FAILED, /* communication failures to/from slurmctld */ SLURMCTLD_COMMUNICATIONS_CONNECTION_ERROR = 1800, diff --git a/src/api/step_launch.c b/src/api/step_launch.c index 09d42c3cc84bfad2b9e9c7835cf77b1115015725..8a21d80584e626c22ce002bc45f37d2c60c8eb13 100644 --- a/src/api/step_launch.c +++ b/src/api/step_launch.c @@ -58,6 +58,7 @@ #include "src/common/forward.h" #include "src/common/plugstack.h" #include "src/common/slurm_cred.h" +#include "src/common/mpi.h" #include "src/api/step_launch.h" #include "src/api/step_ctx.h" @@ -102,36 +103,12 @@ void slurm_step_launch_params_t_init (slurm_step_launch_params_t *ptr) { static slurm_step_io_fds_t fds = SLURM_STEP_IO_FDS_INITIALIZER; + /* Set all values to zero (in other words, "NULL" for pointers) */ memset(ptr, 0, sizeof(slurm_step_launch_params_t)); - /* Values are set to zero or NULL above */ -/* ptr->argc = 0; */ -/* ptr->argv = NULL; */ -/* ptr->envc = 0; */ -/* ptr->env = NULL; */ -/* ptr->cwd = NULL; */ -/* ptr->user_managed_io = false; */ + ptr->buffered_stdio = true; -/* ptr->labelio = false; */ -/* ptr->remote_output_filename = NULL; */ -/* ptr->remote_error_filename = NULL; */ -/* ptr->remote_input_filename = NULL; */ memcpy(&ptr->local_fds, &fds, sizeof(fds)); ptr->gid = getgid(); -/* ptr->multi_prog = false; */ -/* ptr->slurmd_debug = 0; */ -/* ptr->parallel_debug = false; */ -/* ptr->task_prolog = NULL; */ -/* ptr->task_epilog = NULL; */ -/* ptr->cpu_bind_type = 0; */ -/* ptr->cpu_bind = NULL; */ -/* ptr->mem_bind_type = 0; */ -/* ptr->mem_bind = NULL; */ -/* ptr->cpus_per_task = 0; */ -/* ptr->ntasks_per_node = 0; */ -/* ptr->ntasks_per_socket = 0; */ -/* ptr->ntasks_per_core = 0; */ -/* ptr->task_dist = 0; */ -/* ptr->plane_size = 0; */ } /* @@ -147,6 +124,7 @@ int slurm_step_launch (slurm_step_ctx ctx, launch_tasks_request_msg_t launch; int i; char **env = NULL; + char **mpi_env = NULL; int rc = SLURM_SUCCESS; debug("Entering slurm_step_launch"); @@ -170,6 +148,23 @@ int slurm_step_launch (slurm_step_ctx ctx, sizeof(slurm_step_launch_callbacks_t)); } + if (mpi_hook_client_init(params->mpi_plugin_name) == SLURM_ERROR) { + slurm_seterrno(SLURM_MPI_PLUGIN_NAME_INVALID); + return SLURM_ERROR; + } + /* Now, hack the step_layout struct if the following it true. + This looks like an ugly hack to support LAM/MPI's lamboot. */ + if (mpi_hook_client_single_task_per_node()) { + for (i = 0; i < ctx->step_resp->step_layout->node_cnt; i++) + ctx->step_resp->step_layout->tasks[i] = 1; + } + if ((ctx->launch_state->mpi_state = + mpi_hook_client_prelaunch(ctx->launch_state->mpi_info, &mpi_env)) + == NULL) { + slurm_seterrno(SLURM_MPI_PLUGIN_PRELAUNCH_SETUP_FAILED); + return SLURM_ERROR; + } + /* Create message receiving sockets and handler thread */ _msg_thr_create(ctx->launch_state, ctx->step_req->node_count); @@ -182,6 +177,8 @@ int slurm_step_launch (slurm_step_ctx ctx, launch.cred = ctx->step_resp->cred; launch.job_step_id = ctx->step_resp->job_step_id; if (params->env == NULL) { + /* if the user didn't specify an environment, grab the + environment of the running process */ env_array_merge(&env, (const char **)environ); } else { env_array_merge(&env, (const char **)params->env); @@ -198,6 +195,9 @@ int slurm_step_launch (slurm_step_ctx ctx, ent->h_addr_list[0]); xfree(launcher_hostname); } + env_array_merge(&env, (const char **)mpi_env); + env_array_free(mpi_env); + launch.envc = envcount(env); launch.env = env; if (params->cwd != NULL) { @@ -284,7 +284,10 @@ int slurm_step_launch (slurm_step_ctx ctx, if (!ctx->launch_state->user_managed_io) { xfree(launch.io_port); } + goto done; fail1: + +done: xfree(launch.complete_nodelist); xfree(launch.cwd); env_array_free(env); @@ -409,6 +412,8 @@ void slurm_step_launch_wait_finish(slurm_step_ctx ctx) client_io_handler_destroy(sls->io.normal); } + mpi_hook_client_fini(sls->mpi_state); + pthread_mutex_unlock(&sls->lock); } @@ -446,6 +451,10 @@ struct step_launch_state *step_launch_state_create(slurm_step_ctx ctx) sls->resp_port = NULL; sls->abort = false; sls->abort_action_taken = false; + sls->mpi_info->jobid = ctx->step_req->job_id; + sls->mpi_info->stepid = ctx->step_resp->job_step_id; + sls->mpi_info->step_layout = ctx->step_resp->step_layout; + sls->mpi_state = NULL; pthread_mutex_init(&sls->lock, NULL); pthread_cond_init(&sls->cond, NULL); } @@ -467,7 +476,6 @@ void step_launch_state_destroy(struct step_launch_state *sls) if (sls->resp_port != NULL) { xfree(sls->resp_port); } - /* FIXME - more cleanup needed */ } diff --git a/src/api/step_launch.h b/src/api/step_launch.h index 30bbf8f575cf29474af4e2e0ec2d84c44d942e2f..0b0746c58f0d5fe453f1910556eb8d049c0ab825 100644 --- a/src/api/step_launch.h +++ b/src/api/step_launch.h @@ -41,6 +41,7 @@ #include "src/common/slurm_step_layout.h" #include "src/common/eio.h" #include "src/common/bitstring.h" +#include "src/common/mpi.h" #include "src/api/step_io.h" @@ -73,8 +74,11 @@ struct step_launch_state { client_io_t *normal; user_managed_io_t *user; } io; + slurm_step_layout_t *layout; /* a pointer into the ctx step_resp, do not free */ + mpi_plugin_client_info_t mpi_info[1]; + mpi_plugin_client_state_t *mpi_state; /* user registered callbacks */ slurm_step_launch_callbacks_t callback; diff --git a/src/common/env.c b/src/common/env.c index 13e36b30fce8d4f2f45495023bfa9fba9ac96a20..561c6f7f09e3e320f3abac319477de38cd4fea5d 100644 --- a/src/common/env.c +++ b/src/common/env.c @@ -47,9 +47,9 @@ #include <unistd.h> #include <sys/types.h> +#include "src/common/macros.h" #include "slurm/slurm.h" #include "src/common/log.h" -#include "src/common/macros.h" #include "src/common/env.h" #include "src/common/xassert.h" #include "src/common/xmalloc.h" @@ -61,9 +61,17 @@ * Define slurm-specific aliases for use by plugins, see slurm_xlator.h * for details. */ -strong_alias(setenvf, slurm_setenvpf); -strong_alias(unsetenvp, slurm_unsetenvp); -strong_alias(getenvp, slurm_getenvp); +strong_alias(setenvf, slurm_setenvpf); +strong_alias(unsetenvp, slurm_unsetenvp); +strong_alias(getenvp, slurm_getenvp); +strong_alias(env_array_create, slurm_env_array_create); +strong_alias(env_array_merge, slurm_env_array_merge); +strong_alias(env_array_copy, slurm_env_array_copy); +strong_alias(env_array_free, slurm_env_array_free); +strong_alias(env_array_append, slurm_env_array_append); +strong_alias(env_array_append_fmt, slurm_env_array_append_fmt); +strong_alias(env_array_overwrite, slurm_env_array_overwrite); +strong_alias(env_array_overwrite_fmt, slurm_env_array_overwrite_fmt); /* * Return pointer to `name' entry in environment if found, or diff --git a/src/common/env.h b/src/common/env.h index aa992c021d57aa90990a6bc81672cc004cb02daa..060064d72dd8b86efcc14f0eebb231514746f566 100644 --- a/src/common/env.h +++ b/src/common/env.h @@ -227,8 +227,8 @@ int env_array_overwrite_fmt(char ***array_ptr, const char *name, const char *value_fmt, ...); /* - * Set all of the environment variables in a supplied environment - * variable array. + * Set in the running process's environment all of the environment + * variables in a supplied environment variable array. */ void env_array_set_environment(char **env_array); diff --git a/src/common/fd.c b/src/common/fd.c index 78d91841540f3098b04f97a04870c8d40226fcb8..681b35a8322b0033183101b5efdd0075b884f911 100644 --- a/src/common/fd.c +++ b/src/common/fd.c @@ -47,9 +47,20 @@ #include <fcntl.h> #include <unistd.h> +#include "src/common/macros.h" #include "src/common/fd.h" #include "src/common/log.h" +/* + * Define slurm-specific aliases for use by plugins, see slurm_xlator.h + * for details. + */ +strong_alias(fd_read_n, slurm_fd_read_n); +strong_alias(fd_write_n, slurm_fd_write_n); +strong_alias(fd_set_blocking, slurm_fd_set_blocking); +strong_alias(fd_set_nonblocking,slurm_fd_set_nonblocking); + + static int fd_get_lock(int fd, int cmd, int type); static pid_t fd_test_lock(int fd, int type); diff --git a/src/common/fd.h b/src/common/fd.h index cd450b3f65581cf104ae7b3438f99b45a3d690a1..109735caaa1021da521a69210c1c92cb30619da2 100644 --- a/src/common/fd.h +++ b/src/common/fd.h @@ -47,6 +47,7 @@ #include <sys/types.h> #include <unistd.h> +#include "src/common/macros.h" void fd_set_close_on_exec(int fd); /* diff --git a/src/common/mpi.c b/src/common/mpi.c index d52b56fa237b01011df790f0571bef4e8952bef1..5c8116bf08a0d4859459039823eae3166ef5616d 100644 --- a/src/common/mpi.c +++ b/src/common/mpi.c @@ -53,15 +53,18 @@ /* * WARNING: Do not change the order of these fields or add additional - * fields at the beginning of the structure. If you do, job completion - * logging plugins will stop working. If you need to add fields, add them + * fields at the beginning of the structure. If you do, MPI plugins + * will stop working. If you need to add fields, add them * at the end of the structure. */ typedef struct slurm_mpi_ops { - int (*init) (slurmd_job_t *job, int rank); - int (*create_thread) (srun_job_t *job); - int (*single_task) (void); - int (*exit) (void); + int (*slurmstepd_init) (const mpi_plugin_task_info_t *job, + char ***env); + mpi_plugin_client_state_t * + (*client_prelaunch) (const mpi_plugin_client_info_t *job, + char ***env); + bool (*client_single_task)(void); + int (*client_fini) (mpi_plugin_client_state_t *); } slurm_mpi_ops_t; struct slurm_mpi_context { @@ -134,10 +137,10 @@ _slurm_mpi_get_ops( slurm_mpi_context_t c ) * declared for slurm_mpi_ops_t. */ static const char *syms[] = { - "mpi_p_init", - "mpi_p_thr_create", - "mpi_p_single_task", - "mpi_p_exit" + "p_mpi_hook_slurmstepd_task", + "p_mpi_hook_client_prelaunch", + "p_mpi_hook_client_single_task_per_node", + "p_mpi_hook_client_fini" }; int n_syms = sizeof( syms ) / sizeof( char * ); char *plugin_dir = NULL; @@ -233,63 +236,64 @@ done: return retval; } -int srun_mpi_init (char *mpi_type) -{ - debug("mpi type = %s", mpi_type); - - if(_mpi_init(mpi_type) == SLURM_ERROR) - return SLURM_ERROR; - - return SLURM_SUCCESS; -} - -int slurmd_mpi_init (slurmd_job_t *job, int rank) +int mpi_hook_slurmstepd_task (const mpi_plugin_task_info_t *job, char ***env) { - char *mpi_type = getenvp (job->env, "SLURM_MPI_TYPE"); + char *mpi_type = getenvp (*env, "SLURM_MPI_TYPE"); debug("mpi type = %s", mpi_type); if(_mpi_init(mpi_type) == SLURM_ERROR) return SLURM_ERROR; - unsetenvp (job->env, "SLURM_MPI_TYPE"); - return (*(g_context->ops.init))(job, rank); + unsetenvp (*env, "SLURM_MPI_TYPE"); + return (*(g_context->ops.slurmstepd_init))(job, env); } -int mpi_fini (void) +int mpi_hook_client_init (char *mpi_type) { - int rc; - - if (!g_context) - return SLURM_SUCCESS; - - rc = _slurm_mpi_context_destroy(g_context); - return rc; + debug("mpi type = %s", mpi_type); + + if(_mpi_init(mpi_type) == SLURM_ERROR) + return SLURM_ERROR; + + return SLURM_SUCCESS; } -int slurm_mpi_thr_create(srun_job_t *job) +mpi_plugin_client_state_t * +mpi_hook_client_prelaunch(const mpi_plugin_client_info_t *job, char ***env) { if (_mpi_init(NULL) < 0) - return SLURM_ERROR; + return NULL; - return (*(g_context->ops.create_thread))(job); + return (*(g_context->ops.client_prelaunch))(job, env); } -int slurm_mpi_single_task_per_node (void) +bool mpi_hook_client_single_task_per_node (void) { if (_mpi_init(NULL) < 0) return SLURM_ERROR; - return (*(g_context->ops.single_task))(); + return (*(g_context->ops.client_single_task))(); } -int slurm_mpi_exit (void) +int mpi_hook_client_fini (mpi_plugin_client_state_t *state) { if (_mpi_init(NULL) < 0) return SLURM_ERROR; - return (*(g_context->ops.exit))(); + return (*(g_context->ops.client_fini))(state); +} + +int mpi_fini (void) +{ + int rc; + + if (!g_context) + return SLURM_SUCCESS; + + rc = _slurm_mpi_context_destroy(g_context); + return rc; } diff --git a/src/common/mpi.h b/src/common/mpi.h index 029d296f9cb9be7041dda12f481e13b073a332b8..dff76cbba8cf9d2d5959c562179ea67d3249eb1f 100644 --- a/src/common/mpi.h +++ b/src/common/mpi.h @@ -42,17 +42,95 @@ # include "config.h" #endif -#include "src/srun/srun_job.h" -#include "src/slurmd/slurmstepd/slurmstepd_job.h" +#include <stdbool.h> +#include <slurm/slurm.h> typedef struct slurm_mpi_context *slurm_mpi_context_t; +typedef void mpi_plugin_client_state_t; -int srun_mpi_init (char *mpi_type); -int slurmd_mpi_init (slurmd_job_t *job, int rank); -int mpi_fini (void); -int slurm_mpi_thr_create(srun_job_t *job); -int slurm_mpi_single_task_per_node (void); -int slurm_mpi_exit (void); +typedef struct { + uint32_t jobid; + uint32_t stepid; + slurm_step_layout_t *step_layout; +} mpi_plugin_client_info_t; + +typedef struct { + uint32_t jobid; /* Current SLURM job id */ + uint32_t stepid; /* Current step id (or NO_VAL) */ + uint32_t nnodes; /* number of nodes in current job step */ + uint32_t nodeid; /* relative position of this node in job */ + uint32_t ntasks; /* total number of tasks in current job */ + uint32_t ltasks; /* number of tasks on *this* (local) node */ + + uint32_t gtaskid;/* global task rank within the job step */ + int ltaskid;/* task rank within the local node */ + + slurm_addr *self; + slurm_addr *client; +} mpi_plugin_task_info_t; + +/********************************************************************** + * Hooks called by the slurmd and/or slurmstepd. + **********************************************************************/ + +/* + * Load the plugin and call the plugin p_mpi_hook_slurmstepd_task() function. + * + * This function is called from within each process that will exec() a + * task. The process will be running as the user of the job step at that + * point. + * + * If the plugin want to set environment variables for the task, + * it will add the necessary variables the the env array pointed + * to be "env". If "env" is NULL, a new array will be allocated + * automaticallly. + * + * The returned "env" array may be manipulated (and freed) by using + * the src/common/env.c:env_array_* functions. + */ +int mpi_hook_slurmstepd_task (const mpi_plugin_task_info_t *job, char ***env); + +/********************************************************************** + * Hooks called by client applications. + * For instance: srun, slaunch, slurm_step_launch(). + **********************************************************************/ +/* + * Just load the requested plugin. No explicit calls into the plugin + * once loaded (just the implicit call to the plugin's init() function). + * + * If "mpi_type" is NULL, the system-default mpi plugin + * is initialized. + */ +int mpi_hook_client_init (char *mpi_type); + +/* + * Call the plugin p_mpi_hook_client_prelaunch() function. + * + * If the plugin requires that environment variables be set in the + * environment of every task, it will add the necessary variables + * the the env array pointed to be "env". If "env" is NULL, a new + * array will be allocated automaticallly. + * + * The returned "env" array may be manipulated (and freed) by using + * the src/common/env.c:env_array_* functions. + * + * Returns NULL on error. On success returns an opaque pointer + * to MPI state for this job step. Free the state by calling + * mpi_hook_client_fini(). + */ +mpi_plugin_client_state_t * +mpi_hook_client_prelaunch(const mpi_plugin_client_info_t *job, char ***env); + +/* Call the plugin p_mpi_hook_client_single_task_per_node() function. */ +bool mpi_hook_client_single_task_per_node (void); + +/* Call the plugin p_mpi_hook_client_fini() function. */ +int mpi_hook_client_fini (mpi_plugin_client_state_t *state); + +/********************************************************************** + * FIXME - Nobody calls the following function. Perhaps someone should. + **********************************************************************/ +int mpi_fini (void); #endif /* !_SRUN_MPI_H */ diff --git a/src/common/net.c b/src/common/net.c index affb603884fa84d12c15f8ea24d55b6dc844c7fb..59a6329dc5b7e9c533c220a19935146e6882e59d 100644 --- a/src/common/net.c +++ b/src/common/net.c @@ -50,9 +50,18 @@ #include <errno.h> #include <stdint.h> +#include "src/common/macros.h" #include "src/common/log.h" #include "src/common/net.h" +/* + * Define slurm-specific aliases for use by plugins, see slurm_xlator.h + * for details. + */ +strong_alias(net_stream_listen, slurm_net_stream_listen); +strong_alias(net_accept_stream, slurm_net_accept_stream); +strong_alias(net_set_low_water, slurm_net_set_low_water); + #ifndef NET_DEFAULT_BACKLOG # define NET_DEFAULT_BACKLOG 1024 #endif @@ -112,7 +121,7 @@ int net_stream_listen(int *fd, short *port) } -int accept_stream(int fd) +int net_accept_stream(int fd) { int sd; @@ -129,7 +138,6 @@ int accept_stream(int fd) return sd; } - int readn(int fd, void *buf, size_t nbytes) { int n = 0; diff --git a/src/common/net.h b/src/common/net.h index e211c9204ad38f5567a14f3ba00414b9db2dd564..94bdf6d8a94945e7a520f6858557d87dfc8c47e5 100644 --- a/src/common/net.h +++ b/src/common/net.h @@ -41,6 +41,8 @@ #include <stdint.h> +#include "src/common/macros.h" + /* open a stream socket on an ephemereal port and put it into * the listen state. fd and port are filled in with the new * socket's file descriptor and port #. diff --git a/src/common/plugstack.c b/src/common/plugstack.c index d46441043d8584b1706d3b69bec3547168ce54c1..2c9f30e6a9184276ff679d3469607ad0f95eff36 100644 --- a/src/common/plugstack.c +++ b/src/common/plugstack.c @@ -54,7 +54,7 @@ #include "src/common/job_options.h" #include "src/slurmd/slurmstepd/slurmstepd_job.h" -#include "src/srun/srun_job.h" +/*#include "src/srun/srun_job.h"*/ #include <slurm/spank.h> diff --git a/src/common/slurm_errno.c b/src/common/slurm_errno.c index 2fc5d9902ba5fdb00504c55ccc6699d1dd359a88..b632b528b032f701de4691370bfc6c2a34815872 100644 --- a/src/common/slurm_errno.c +++ b/src/common/slurm_errno.c @@ -87,6 +87,10 @@ static slurm_errtab_t slurm_errtab[] = { "Protocol authentication error" }, { SLURM_PROTOCOL_INSANE_MSG_LENGTH, "Insane message length" }, + { SLURM_MPI_PLUGIN_NAME_INVALID, + "Invalid MPI plugin name" }, + { SLURM_MPI_PLUGIN_PRELAUNCH_SETUP_FAILED, + "MPI plugin's pre-launch setup failed" }, /* communication failures to/from slurmctld */ { SLURMCTLD_COMMUNICATIONS_CONNECTION_ERROR, diff --git a/src/common/slurm_xlator.h b/src/common/slurm_xlator.h index 32c426711061e494900cc5246bb3b3b893dee22d..8e38ed87b90c8d47563ed845d49cbccbe5c8ca5b 100644 --- a/src/common/slurm_xlator.h +++ b/src/common/slurm_xlator.h @@ -8,7 +8,7 @@ * the SLURM function. By renaming the functions, inappropriate linking * should be avoided. * - * All SLURM functions referenced from the switch and auth plugins should + * All SLURM functions referenced from the switch, auth, and mpi plugins should * have aliases established. Functions not referenced from the plugins * need not be aliased. * @@ -113,6 +113,12 @@ #define bit_nffs slurm_bit_nffs #define bit_copybits slurm_bit_copybits +/* fd.[ch] functions */ +#define fd_read_n slurm_fd_read_n +#define fd_write_n slurm_fd_write_n +#define fd_set_blocking slurm_fd_set_blocking +#define fd_set_nonblocking slurm_fd_set_nonblocking + /* hostlist.[ch] functions */ #define hostlist_create slurm_hostlist_create #define hostlist_copy slurm_hostlist_copy @@ -203,6 +209,11 @@ * None exported today. * The header file used only for #define values. */ +/* net.[ch] functions */ +#define net_stream_listen slurm_net_stream_listen +#define net_accept_stream slurm_net_accept_stream +#define net_set_low_water slurm_net_set_low_water + /* pack.[ch] functions */ #define create_buf slurm_create_buf #define free_buf slurm_free_buf @@ -233,6 +244,15 @@ #define setenvf slurm_setenvpf #define unsetenvp slurm_unsetenvp #define getenvp slurm_getenvp +#define env_array_create slurm_env_array_create +#define env_array_merge slurm_env_array_merge +#define env_array_copy slurm_env_array_copy +#define env_array_free slurm_env_array_free +#define env_array_append slurm_env_array_append +#define env_array_append_fmt slurm_env_array_append_fmt +#define env_array_overwrite slurm_env_array_overwrite +#define env_array_overwrite_fmt slurm_env_array_overwrite_fmt + /* slurm_auth.[ch] functions * None exported today. diff --git a/src/plugins/mpi/lam/lam.h b/src/plugins/mpi/lam/lam.h index 03ad0b21191e594a067edad85edbcc95d9af1a85..ab493a71e05f1b14964e665510f0a555bc697340 100644 --- a/src/plugins/mpi/lam/lam.h +++ b/src/plugins/mpi/lam/lam.h @@ -39,8 +39,8 @@ # include "config.h" #endif -#include "src/srun/srun_job.h" -#include "src/slurmd/slurmstepd/slurmstepd_job.h" +#include "src/common/slurm_xlator.h" +#include "src/common/mpi.h" #include "src/common/env.h" -//extern int lam_thr_create(srun_job_t *job); +//extern int lam_thr_create(mpi_plugin_client_info_t *job); diff --git a/src/plugins/mpi/lam/mpi_lam.c b/src/plugins/mpi/lam/mpi_lam.c index 795991f63bad6ba95da671ecae7d721b82a5c382..c366fb6aa5776d23765cd0b1638696b94a1f2fb1 100644 --- a/src/plugins/mpi/lam/mpi_lam.c +++ b/src/plugins/mpi/lam/mpi_lam.c @@ -79,24 +79,27 @@ const char plugin_name[] = "mpi LAM plugin"; const char plugin_type[] = "mpi/lam"; const uint32_t plugin_version = 100; -int mpi_p_init(slurmd_job_t *job) +int p_mpi_hook_slurmstepd_task(const mpi_plugin_task_info_t *job, + char ***env) { debug("Using mpi/lam"); return SLURM_SUCCESS; } -int mpi_p_thr_create(srun_job_t *job) +mpi_plugin_client_state_t * +p_mpi_hook_client_prelaunch(const mpi_plugin_client_info_t *job, char ***env) { debug("Using mpi/lam"); - return SLURM_SUCCESS; + /* only return NULL on error */ + return (void *)0xdeadbeef; } -int mpi_p_single_task() +int p_mpi_hook_client_single_task_per_node() { return true; } -int mpi_p_exit() +int p_mpi_hook_client_fini(mpi_plugin_client_state_t *state) { return SLURM_SUCCESS; } diff --git a/src/plugins/mpi/mpichgm/Makefile.am b/src/plugins/mpi/mpichgm/Makefile.am index 06317f6f21d0141c7b2dd14849bf0be2eda83d19..8e281e6c35d5951012bdf1a419107f7536cd273b 100644 --- a/src/plugins/mpi/mpichgm/Makefile.am +++ b/src/plugins/mpi/mpichgm/Makefile.am @@ -10,6 +10,5 @@ pkglib_LTLIBRARIES = mpi_mpichgm.la # Null switch plugin. mpi_mpichgm_la_SOURCES = mpi_mpichgm.c mpichgm.c mpichgm.h\ - $(top_srcdir)/src/srun/srun_job.c \ - $(top_srcdir)/src/common/net.c + $(top_srcdir)/src/common/mpi.h mpi_mpichgm_la_LDFLAGS = $(SO_LDFLAGS) $(PLUGIN_FLAGS) diff --git a/src/plugins/mpi/mpichgm/Makefile.in b/src/plugins/mpi/mpichgm/Makefile.in index 1abec11240aebef3efd6de16c6f9710d4205e20e..1a7ec3e24bd49d3c54cfdf5cbc26956e959237f6 100644 --- a/src/plugins/mpi/mpichgm/Makefile.in +++ b/src/plugins/mpi/mpichgm/Makefile.in @@ -78,8 +78,7 @@ am__installdirs = "$(DESTDIR)$(pkglibdir)" pkglibLTLIBRARIES_INSTALL = $(INSTALL) LTLIBRARIES = $(pkglib_LTLIBRARIES) mpi_mpichgm_la_LIBADD = -am_mpi_mpichgm_la_OBJECTS = mpi_mpichgm.lo mpichgm.lo srun_job.lo \ - net.lo +am_mpi_mpichgm_la_OBJECTS = mpi_mpichgm.lo mpichgm.lo mpi_mpichgm_la_OBJECTS = $(am_mpi_mpichgm_la_OBJECTS) DEFAULT_INCLUDES = -I. -I$(srcdir) -I$(top_builddir) -I$(top_builddir)/slurm depcomp = $(SHELL) $(top_srcdir)/auxdir/depcomp @@ -282,8 +281,7 @@ pkglib_LTLIBRARIES = mpi_mpichgm.la # Null switch plugin. mpi_mpichgm_la_SOURCES = mpi_mpichgm.c mpichgm.c mpichgm.h\ - $(top_srcdir)/src/srun/srun_job.c \ - $(top_srcdir)/src/common/net.c + $(top_srcdir)/src/common/mpi.h mpi_mpichgm_la_LDFLAGS = $(SO_LDFLAGS) $(PLUGIN_FLAGS) all: all-am @@ -357,8 +355,6 @@ distclean-compile: @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/mpi_mpichgm.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/mpichgm.Plo@am__quote@ -@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/net.Plo@am__quote@ -@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/srun_job.Plo@am__quote@ .c.o: @am__fastdepCC_TRUE@ if $(COMPILE) -MT $@ -MD -MP -MF "$(DEPDIR)/$*.Tpo" -c -o $@ $<; \ @@ -381,20 +377,6 @@ distclean-compile: @AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@ @am__fastdepCC_FALSE@ $(LTCOMPILE) -c -o $@ $< -srun_job.lo: $(top_srcdir)/src/srun/srun_job.c -@am__fastdepCC_TRUE@ if $(LIBTOOL) --mode=compile --tag=CC $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -MT srun_job.lo -MD -MP -MF "$(DEPDIR)/srun_job.Tpo" -c -o srun_job.lo `test -f '$(top_srcdir)/src/srun/srun_job.c' || echo '$(srcdir)/'`$(top_srcdir)/src/srun/srun_job.c; \ -@am__fastdepCC_TRUE@ then mv -f "$(DEPDIR)/srun_job.Tpo" "$(DEPDIR)/srun_job.Plo"; else rm -f "$(DEPDIR)/srun_job.Tpo"; exit 1; fi -@AMDEP_TRUE@@am__fastdepCC_FALSE@ source='$(top_srcdir)/src/srun/srun_job.c' object='srun_job.lo' libtool=yes @AMDEPBACKSLASH@ -@AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@ -@am__fastdepCC_FALSE@ $(LIBTOOL) --mode=compile --tag=CC $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -c -o srun_job.lo `test -f '$(top_srcdir)/src/srun/srun_job.c' || echo '$(srcdir)/'`$(top_srcdir)/src/srun/srun_job.c - -net.lo: $(top_srcdir)/src/common/net.c -@am__fastdepCC_TRUE@ if $(LIBTOOL) --mode=compile --tag=CC $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -MT net.lo -MD -MP -MF "$(DEPDIR)/net.Tpo" -c -o net.lo `test -f '$(top_srcdir)/src/common/net.c' || echo '$(srcdir)/'`$(top_srcdir)/src/common/net.c; \ -@am__fastdepCC_TRUE@ then mv -f "$(DEPDIR)/net.Tpo" "$(DEPDIR)/net.Plo"; else rm -f "$(DEPDIR)/net.Tpo"; exit 1; fi -@AMDEP_TRUE@@am__fastdepCC_FALSE@ source='$(top_srcdir)/src/common/net.c' object='net.lo' libtool=yes @AMDEPBACKSLASH@ -@AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@ -@am__fastdepCC_FALSE@ $(LIBTOOL) --mode=compile --tag=CC $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -c -o net.lo `test -f '$(top_srcdir)/src/common/net.c' || echo '$(srcdir)/'`$(top_srcdir)/src/common/net.c - mostlyclean-libtool: -rm -f *.lo diff --git a/src/plugins/mpi/mpichgm/mpi_mpichgm.c b/src/plugins/mpi/mpichgm/mpi_mpichgm.c index cf4728d27fdc06d6159c8db450ae20c88bddd1b3..408d8b27beee627c1ea733cc0312a14966c2b5eb 100644 --- a/src/plugins/mpi/mpichgm/mpi_mpichgm.c +++ b/src/plugins/mpi/mpichgm/mpi_mpichgm.c @@ -79,38 +79,40 @@ const char plugin_name[] = "mpi MPICH-GM plugin"; const char plugin_type[] = "mpi/mpichgm"; const uint32_t plugin_version = 100; -int mpi_p_init(slurmd_job_t *job, int rank) +int p_mpi_hook_slurmstepd_task(const mpi_plugin_task_info_t *job, + char ***env) { char addrbuf[1024]; char *p; - char *addr = getenvp (job->env, "SLURM_LAUNCH_NODE_IPADDR"); + char *addr = getenvp(*env, "SLURM_LAUNCH_NODE_IPADDR"); debug("Using mpi/mpich-gm"); - slurm_print_slurm_addr (job->envtp->self, addrbuf, sizeof(addrbuf)); + slurm_print_slurm_addr (job->self, addrbuf, sizeof(addrbuf)); if ((p = strchr (addrbuf, ':')) != NULL) *p = '\0'; - setenvf (&job->env, "GMPI_MASTER", "%s", addr); - setenvf (&job->env, "GMPI_SLAVE", "%s", addrbuf); - setenvf (&job->env, "GMPI_ID", "%d", rank); - debug2("init for mpi rank %d\n", rank); + env_array_overwrite_fmt(env, "GMPI_MASTER", "%s", addr); + env_array_overwrite_fmt(env, "GMPI_SLAVE", "%s", addrbuf); + env_array_overwrite_fmt(env, "GMPI_ID", "%u", job->gtaskid); + debug2("init for mpi rank %u\n", job->gtaskid); return SLURM_SUCCESS; } -int mpi_p_thr_create(srun_job_t *job) +mpi_plugin_client_state_t * +p_mpi_hook_client_prelaunch(mpi_plugin_client_info_t *job, char ***env) { debug("Using mpi/mpich-gm"); - return gmpi_thr_create(job); + return (mpi_plugin_client_state_t *)gmpi_thr_create(job, env); } -int mpi_p_single_task() +int p_mpi_hook_client_single_task_per_node() { return false; } -int mpi_p_exit() +int p_mpi_hook_client_fini(mpi_plugin_client_state_t *state) { - return SLURM_SUCCESS; + return gmpi_thr_destroy((gmpi_state_t *)state); } diff --git a/src/plugins/mpi/mpichgm/mpichgm.c b/src/plugins/mpi/mpichgm/mpichgm.c index 079dfc4a75b365237c591778aa14ab808f58f5b4..9adbf252f0c6a63ecb503872727bcdb8d92147ff 100644 --- a/src/plugins/mpi/mpichgm/mpichgm.c +++ b/src/plugins/mpi/mpichgm/mpichgm.c @@ -51,11 +51,11 @@ #include <netinet/in.h> #include <strings.h> +#include "src/common/slurm_xlator.h" #include "src/common/xmalloc.h" #include "src/common/xstring.h" #include "src/common/net.h" -#include "src/srun/srun_job.h" -#include "src/srun/opt.h" +#include "src/common/mpi.h" #include "src/plugins/mpi/mpichgm/mpichgm.h" @@ -71,10 +71,13 @@ typedef struct { #define GMPI_RECV_BUF_LEN 65536 +struct gmpi_state { + pthread_t tid; + int fd; /* = -1 */ + mpi_plugin_client_info_t *job; +}; -static int gmpi_fd = -1; - -static int _gmpi_parse_init_recv_msg(srun_job_t *job, char *rbuf, +static int _gmpi_parse_init_recv_msg(mpi_plugin_client_info_t *job, char *rbuf, gm_slave_t *slave_data, int *ii) { unsigned int magic, id, port_board_id, unique_high_id, @@ -94,7 +97,7 @@ static int _gmpi_parse_init_recv_msg(srun_job_t *job, char *rbuf, error("GMPI master received invalid magic number"); return -1; } - if (id >= job->ntasks) + if (id >= job->step_layout->task_cnt) fatal("GMPI id is out of range"); if (port_board_id == 0) fatal("MPI id=%d was unable to open a GM port", id); @@ -120,8 +123,9 @@ static int _gmpi_parse_init_recv_msg(srun_job_t *job, char *rbuf, } -static int _gmpi_establish_map(srun_job_t *job) +static int _gmpi_establish_map(gmpi_state_t *st) { + mpi_plugin_client_info_t *job = st->job; struct sockaddr_in addr; in_addr_t *iaddrs; socklen_t addrlen; @@ -135,9 +139,9 @@ static int _gmpi_establish_map(srun_job_t *job) * Collect info from slaves. * Will never finish unless slaves are GMPI processes. */ - accfd = gmpi_fd; + accfd = st->fd; addrlen = sizeof(addr); - nprocs = job->ntasks; + nprocs = job->step_layout->task_cnt; iaddrs = (in_addr_t *)xmalloc(sizeof(*iaddrs)*nprocs); slave_data = (gm_slave_t *)xmalloc(sizeof(*slave_data)*nprocs); for (i=0; i<nprocs; i++) @@ -241,9 +245,9 @@ static int _gmpi_establish_map(srun_job_t *job) return 0; } - -static void _gmpi_wait_abort(srun_job_t *job) +static void _gmpi_wait_abort(gmpi_state_t *st) { + mpi_plugin_client_info_t *job = st->job; struct sockaddr_in addr; socklen_t addrlen; int newfd, rlen; @@ -253,7 +257,7 @@ static void _gmpi_wait_abort(srun_job_t *job) rbuf = (char *)xmalloc(GMPI_RECV_BUF_LEN); addrlen = sizeof(addr); while (1) { - newfd = accept(gmpi_fd, (struct sockaddr *)&addr, + newfd = accept(st->fd, (struct sockaddr *)&addr, &addrlen); if (newfd == -1) { fatal("GMPI master failed to accept (abort-wait)"); @@ -278,7 +282,7 @@ static void _gmpi_wait_abort(srun_job_t *job) } close(newfd); debug("Received ABORT message from an MPI process."); - fwd_signal(job, SIGKILL, opt.max_threads); + slurm_signal_job_step(job->jobid, job->stepid, SIGKILL); #if 0 xfree(rbuf); close(jgmpi_fd); @@ -291,25 +295,49 @@ static void _gmpi_wait_abort(srun_job_t *job) static void *_gmpi_thr(void *arg) { - srun_job_t *job; + gmpi_state_t *st; + mpi_plugin_client_info_t *job; - job = (srun_job_t *) arg; + st = (gmpi_state_t *) arg; + job = st->job; debug3("GMPI master thread pid=%lu", (unsigned long) getpid()); - _gmpi_establish_map(job); + _gmpi_establish_map(st); debug3("GMPI master thread is waiting for ABORT message."); - _gmpi_wait_abort(job); + _gmpi_wait_abort(st); return (void *)0; } +static gmpi_state_t * +gmpi_state_create(const mpi_plugin_client_info_t *job) +{ + gmpi_state_t *state; + + state = (gmpi_state_t *)xmalloc(sizeof(gmpi_state_t)); + + state->tid = (pthread_t)-1; + state->fd = -1; + *(state->job) = *job; + + return state; +} + +static void +gmpi_state_destroy(gmpi_state_t *st) +{ + xfree(st); +} -extern int gmpi_thr_create(srun_job_t *job) +extern gmpi_state_t * +gmpi_thr_create(const mpi_plugin_client_info_t *job, char ***env) { short port; pthread_attr_t attr; - pthread_t gtid; + gmpi_state_t *st = NULL; + + st = gmpi_state_create(job); /* * It is possible for one to modify the mpirun command in @@ -318,11 +346,12 @@ extern int gmpi_thr_create(srun_job_t *job) * should not override envs nor open the master port. */ if (getenv("GMPI_PORT")) - return (0); + return st; - if (net_stream_listen (&gmpi_fd, &port) < 0) { + if (net_stream_listen (&st->fd, &port) < 0) { error ("Unable to create GMPI listen port: %m"); - return -1; + gmpi_state_destroy(st); + return NULL; } /* @@ -330,19 +359,34 @@ extern int gmpi_thr_create(srun_job_t *job) */ slurm_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); - if (pthread_create(>id, &attr, &_gmpi_thr, (void *)job)) { + if (pthread_create(&st->tid, &attr, &_gmpi_thr, (void *)job)) { slurm_attr_destroy(&attr); - return -1; + gmpi_state_destroy(st); + return NULL; } slurm_attr_destroy(&attr); - setenvf (NULL, "GMPI_PORT", "%u", port); - setenvf (NULL, "GMPI_MAGIC", "%u", job->jobid); - setenvf (NULL, "GMPI_NP", "%d", job->ntasks); - setenvf (NULL, "GMPI_SHMEM", "1"); + + env_array_overwrite_fmt(env, "GMPI_PORT", "%u", port); + env_array_overwrite_fmt(env, "GMPI_MAGIC", "%u", job->jobid); + env_array_overwrite_fmt(env, "GMPI_NP", "%d", + job->step_layout->task_cnt); + env_array_overwrite_fmt(env, "GMPI_SHMEM", "1"); /* FIXME for multi-board config. */ - setenvf (NULL, "GMPI_BOARD", "-1"); + env_array_overwrite_fmt(env, "GMPI_BOARD", "-1"); - debug("Started GMPI master thread (%lu)", (unsigned long) gtid); + debug("Started GMPI master thread (%lu)", (unsigned long) st->tid); - return 0; + return st; +} + +extern int gmpi_thr_destroy(gmpi_state_t *st) +{ + if (st != NULL) { + if (st->tid != (pthread_t)-1) { + pthread_cancel(st->tid); + pthread_join(st->tid, NULL); + } + gmpi_state_destroy(st); + } + return SLURM_SUCCESS; } diff --git a/src/plugins/mpi/mpichgm/mpichgm.h b/src/plugins/mpi/mpichgm/mpichgm.h index 3625697b0f29372754289ec3f92bf76041310347..8f092e832fcf49d6c80986adf76a003c2e38a042 100644 --- a/src/plugins/mpi/mpichgm/mpichgm.h +++ b/src/plugins/mpi/mpichgm/mpichgm.h @@ -39,8 +39,12 @@ # include "config.h" #endif -#include "src/srun/srun_job.h" -#include "src/slurmd/slurmstepd/slurmstepd_job.h" +#include "src/common/slurm_xlator.h" +#include "src/common/mpi.h" #include "src/common/env.h" -extern int gmpi_thr_create(srun_job_t *job); +typedef struct gmpi_state gmpi_state_t; + +extern gmpi_state_t *gmpi_thr_create(const mpi_plugin_client_info_t *job, + char ***env); +extern int gmpi_thr_destroy(gmpi_state_t *state); diff --git a/src/plugins/mpi/mvapich/Makefile.am b/src/plugins/mpi/mvapich/Makefile.am index 08ada9e55a279dd7941187b686cb0f93e846da40..69d6d69d59eb2da4d77f456014dc6d97f72ecb95 100644 --- a/src/plugins/mpi/mvapich/Makefile.am +++ b/src/plugins/mpi/mvapich/Makefile.am @@ -8,9 +8,7 @@ INCLUDES = -I$(top_srcdir) -I$(top_srcdir)/src/common pkglib_LTLIBRARIES = mpi_mvapich.la -# Null switch plugin. mpi_mvapich_la_SOURCES = mpi_mvapich.c mvapich.c mvapich.h\ - $(top_srcdir)/src/common/net.c \ - $(top_srcdir)/src/srun/srun_job.c + $(top_srcdir)/src/common/mpi.h mpi_mvapich_la_LDFLAGS = $(SO_LDFLAGS) $(PLUGIN_FLAGS) diff --git a/src/plugins/mpi/mvapich/Makefile.in b/src/plugins/mpi/mvapich/Makefile.in index b6d24d7817460653b907dc425e9c8cef240a248b..ca2e4c5fc0db83ad3299d46069d74025975536a3 100644 --- a/src/plugins/mpi/mvapich/Makefile.in +++ b/src/plugins/mpi/mvapich/Makefile.in @@ -78,8 +78,7 @@ am__installdirs = "$(DESTDIR)$(pkglibdir)" pkglibLTLIBRARIES_INSTALL = $(INSTALL) LTLIBRARIES = $(pkglib_LTLIBRARIES) mpi_mvapich_la_LIBADD = -am_mpi_mvapich_la_OBJECTS = mpi_mvapich.lo mvapich.lo net.lo \ - srun_job.lo +am_mpi_mvapich_la_OBJECTS = mpi_mvapich.lo mvapich.lo mpi_mvapich_la_OBJECTS = $(am_mpi_mvapich_la_OBJECTS) DEFAULT_INCLUDES = -I. -I$(srcdir) -I$(top_builddir) -I$(top_builddir)/slurm depcomp = $(SHELL) $(top_srcdir)/auxdir/depcomp @@ -279,12 +278,11 @@ AUTOMAKE_OPTIONS = foreign PLUGIN_FLAGS = -module -avoid-version --export-dynamic INCLUDES = -I$(top_srcdir) -I$(top_srcdir)/src/common pkglib_LTLIBRARIES = mpi_mvapich.la - -# Null switch plugin. mpi_mvapich_la_SOURCES = mpi_mvapich.c mvapich.c mvapich.h\ - $(top_srcdir)/src/common/net.c \ - $(top_srcdir)/src/srun/srun_job.c + $(top_srcdir)/src/common/mpi.h + +#mpi_mvapich_la_LIBADD = $(top_builddir)/src/api/libslurmhelper.la mpi_mvapich_la_LDFLAGS = $(SO_LDFLAGS) $(PLUGIN_FLAGS) all: all-am @@ -357,8 +355,6 @@ distclean-compile: @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/mpi_mvapich.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/mvapich.Plo@am__quote@ -@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/net.Plo@am__quote@ -@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/srun_job.Plo@am__quote@ .c.o: @am__fastdepCC_TRUE@ if $(COMPILE) -MT $@ -MD -MP -MF "$(DEPDIR)/$*.Tpo" -c -o $@ $<; \ @@ -381,20 +377,6 @@ distclean-compile: @AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@ @am__fastdepCC_FALSE@ $(LTCOMPILE) -c -o $@ $< -net.lo: $(top_srcdir)/src/common/net.c -@am__fastdepCC_TRUE@ if $(LIBTOOL) --mode=compile --tag=CC $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -MT net.lo -MD -MP -MF "$(DEPDIR)/net.Tpo" -c -o net.lo `test -f '$(top_srcdir)/src/common/net.c' || echo '$(srcdir)/'`$(top_srcdir)/src/common/net.c; \ -@am__fastdepCC_TRUE@ then mv -f "$(DEPDIR)/net.Tpo" "$(DEPDIR)/net.Plo"; else rm -f "$(DEPDIR)/net.Tpo"; exit 1; fi -@AMDEP_TRUE@@am__fastdepCC_FALSE@ source='$(top_srcdir)/src/common/net.c' object='net.lo' libtool=yes @AMDEPBACKSLASH@ -@AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@ -@am__fastdepCC_FALSE@ $(LIBTOOL) --mode=compile --tag=CC $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -c -o net.lo `test -f '$(top_srcdir)/src/common/net.c' || echo '$(srcdir)/'`$(top_srcdir)/src/common/net.c - -srun_job.lo: $(top_srcdir)/src/srun/srun_job.c -@am__fastdepCC_TRUE@ if $(LIBTOOL) --mode=compile --tag=CC $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -MT srun_job.lo -MD -MP -MF "$(DEPDIR)/srun_job.Tpo" -c -o srun_job.lo `test -f '$(top_srcdir)/src/srun/srun_job.c' || echo '$(srcdir)/'`$(top_srcdir)/src/srun/srun_job.c; \ -@am__fastdepCC_TRUE@ then mv -f "$(DEPDIR)/srun_job.Tpo" "$(DEPDIR)/srun_job.Plo"; else rm -f "$(DEPDIR)/srun_job.Tpo"; exit 1; fi -@AMDEP_TRUE@@am__fastdepCC_FALSE@ source='$(top_srcdir)/src/srun/srun_job.c' object='srun_job.lo' libtool=yes @AMDEPBACKSLASH@ -@AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@ -@am__fastdepCC_FALSE@ $(LIBTOOL) --mode=compile --tag=CC $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -c -o srun_job.lo `test -f '$(top_srcdir)/src/srun/srun_job.c' || echo '$(srcdir)/'`$(top_srcdir)/src/srun/srun_job.c - mostlyclean-libtool: -rm -f *.lo diff --git a/src/plugins/mpi/mvapich/mpi_mvapich.c b/src/plugins/mpi/mvapich/mpi_mvapich.c index 5500190ef0415cfe3d0d7a3212fe9ead40922e64..0938328362cd3b00687bcbbfbfb7cd3bdad89ee8 100644 --- a/src/plugins/mpi/mvapich/mpi_mvapich.c +++ b/src/plugins/mpi/mvapich/mpi_mvapich.c @@ -48,6 +48,7 @@ #include <slurm/slurm_errno.h> #include "src/common/slurm_xlator.h" #include "src/plugins/mpi/mvapich/mvapich.h" + /* * These variables are required by the generic plugin interface. If they * are not found in the plugin, the plugin loader will ignore it. @@ -79,56 +80,58 @@ const char plugin_name[] = "mpi MVAPICH plugin"; const char plugin_type[] = "mpi/mvapich"; const uint32_t plugin_version = 100; -int mpi_p_init (slurmd_job_t *job, int rank) +int p_mpi_hook_slurmstepd_task (const mpi_plugin_task_info_t *job, + char ***env) { int i; char *processes = NULL; - char *addr = getenvp (job->env, "SLURM_LAUNCH_NODE_IPADDR"); + char *addr = getenvp (*env, "SLURM_LAUNCH_NODE_IPADDR"); debug("Using mpi/mvapich"); - setenvf (&job->env, "MPIRUN_HOST", "%s", addr); - setenvf (&job->env, "MPIRUN_RANK", "%d", rank); - setenvf (&job->env, "MPIRUN_MPD", "0"); + env_array_overwrite_fmt(env, "MPIRUN_HOST", "%s", addr); + env_array_overwrite_fmt(env, "MPIRUN_RANK", "%u", job->gtaskid); + env_array_overwrite_fmt(env, "MPIRUN_MPD", "0"); - debug2("init for mpi rank %d\n", rank); + debug2("init for mpi rank %u\n", job->gtaskid); /* * Fake MPIRUN_PROCESSES env var -- we don't need this for * SLURM at this time. (what a waste) */ - for (i = 0; i < job->nprocs; i++) + for (i = 0; i < job->ntasks; i++) xstrcat (processes, "x:"); - setenvf (&job->env, "MPIRUN_PROCESSES", "%s", processes); + env_array_overwrite_fmt(env, "MPIRUN_PROCESSES", "%s", processes); /* * Some mvapich versions will ignore MPIRUN_PROCESSES If * the following env var is set. */ - setenvf (&job->env, "NOT_USE_TOTALVIEW", "1"); + env_array_overwrite_fmt(env, "NOT_USE_TOTALVIEW", "1"); /* * Set VIADEV_ENABLE_AFFINITY=0 so that mvapich doesn't * override SLURM's CPU affinity. (Unless this var is * already set in user env) */ - if (!getenvp (job->env, "VIADEV_ENABLE_AFFINITY")) - setenvf (&job->env, "VIADEV_ENABLE_AFFINITY", "0"); + if (!getenvp (*env, "VIADEV_ENABLE_AFFINITY")) + env_array_overwrite_fmt(env, "VIADEV_ENABLE_AFFINITY", "0"); return SLURM_SUCCESS; } -int mpi_p_thr_create(srun_job_t *job) +mpi_plugin_client_state_t * +p_mpi_hook_client_prelaunch(mpi_plugin_client_info_t *job, char ***env) { debug("Using mpi/mvapich"); - return mvapich_thr_create(job); + return (mpi_plugin_client_state_t *)mvapich_thr_create(job, env); } -int mpi_p_single_task() +int p_mpi_hook_client_single_task_per_node() { return false; } -int mpi_p_exit() +int p_mpi_hook_client_fini(mpi_plugin_client_state_t *state) { - return SLURM_SUCCESS; + return mvapich_thr_destroy((mvapich_state_t *)state); } diff --git a/src/plugins/mpi/mvapich/mvapich.c b/src/plugins/mpi/mvapich/mvapich.c index 6df49cb277661f2908c4991cb8d2244908f9cf50..7edc2524c9bf48f1751a7e6560d1549d25df58f2 100644 --- a/src/plugins/mpi/mvapich/mvapich.c +++ b/src/plugins/mpi/mvapich/mvapich.c @@ -52,12 +52,11 @@ #include <sys/poll.h> #include <sys/time.h> +#include "src/common/slurm_xlator.h" #include "src/common/xmalloc.h" #include "src/common/xstring.h" #include "src/common/net.h" #include "src/common/fd.h" -#include "src/srun/srun_job.h" -#include "src/srun/opt.h" /* NOTE: MVAPICH has changed protocols without changing version numbers. * This makes support of MVAPICH very difficult. @@ -85,15 +84,6 @@ } while (0) #endif -/* - * Arguments passed to mvapich support thread. - */ -struct mvapich_args { - srun_job_t *job; /* SRUN job information */ - int fd; /* fd on which to accept new connections */ -}; - - /* * Information read from each MVAPICH process */ @@ -122,15 +112,24 @@ struct mvapich_info /* Globals for the mvapich thread. */ -static struct mvapich_info **mvarray = NULL; -static int mvapich_fd = -1; -static int nprocs = -1; -static int protocol_version = -1; -static int protocol_phase = 0; -static int connect_once = 1; -static int mvapich_verbose = 0; -static int do_timing = 0; +int mvapich_verbose = 0; +/* Per-job step state information. The MPI plugin may be called + * multiple times from the SLURM API's slurm_step_launch() in the + * same process. + */ +struct mvapich_state { + pthread_t tid; + struct mvapich_info **mvarray; + int fd; + int nprocs; + int protocol_version; + int protocol_phase; + int connect_once; + int do_timing; + + mpi_plugin_client_info_t job[1]; +}; #define mvapich_debug(args...) \ do { \ @@ -161,11 +160,11 @@ static void mvapich_info_destroy (struct mvapich_info *mvi) return; } -static int mvapich_requires_pids (void) +static int mvapich_requires_pids (mvapich_state_t *st) { - if ( protocol_version == MVAPICH_VERSION_REQUIRES_PIDS - || protocol_version == 5 - || protocol_version == 6 ) + if ( st->protocol_version == MVAPICH_VERSION_REQUIRES_PIDS + || st->protocol_version == 5 + || st->protocol_version == 6 ) return (1); return (0); } @@ -173,14 +172,14 @@ static int mvapich_requires_pids (void) /* * Return non-zero if protocol version has two phases. */ -static int mvapich_dual_phase (void) +static int mvapich_dual_phase (mvapich_state_t *st) { - return (protocol_version == 5 || protocol_version == 6); + return (st->protocol_version == 5 || st->protocol_version == 6); } -static int mvapich_abort_sends_rank (void) +static int mvapich_abort_sends_rank (mvapich_state_t *st) { - if (protocol_version >= 3) + if (st->protocol_version >= 3) return (1); return (0); } @@ -189,7 +188,8 @@ static int mvapich_abort_sends_rank (void) * Create an mvapich_info object by reading information from * file descriptor `fd' */ -static int mvapich_get_task_info (struct mvapich_info *mvi) +static int mvapich_get_task_info (mvapich_state_t *st, + struct mvapich_info *mvi) { int fd = mvi->fd; @@ -203,7 +203,7 @@ static int mvapich_get_task_info (struct mvapich_info *mvi) return error ("mvapich: Unable to read addr info for rank %d: %m", mvi->rank); - if (!mvapich_requires_pids ()) + if (!mvapich_requires_pids (st)) return (0); if (fd_read_n (fd, &mvi->pidlen, sizeof (int)) <= 0) @@ -236,12 +236,13 @@ static int mvapich_get_hostid (struct mvapich_info *mvi) return (0); } -static int mvapich_get_task_header (int fd, int *version, int *rank) +static int mvapich_get_task_header (mvapich_state_t *st, + int fd, int *version, int *rank) { /* * dual phase only sends version on first pass */ - if (!mvapich_dual_phase () || protocol_phase == 0) { + if (!mvapich_dual_phase (st) || st->protocol_phase == 0) { if (fd_read_n (fd, version, sizeof (int)) < 0) return error ("mvapich: Unable to read version from task: %m"); } @@ -249,38 +250,39 @@ static int mvapich_get_task_header (int fd, int *version, int *rank) if (fd_read_n (fd, rank, sizeof (int)) < 0) return error ("mvapich: Unable to read task rank: %m"); - if (mvapich_dual_phase () && protocol_phase > 0) + if (mvapich_dual_phase (st) && st->protocol_phase > 0) return (0); - if (protocol_version == -1) - protocol_version = *version; - else if (protocol_version != *version) { - return error ("mvapich: rank %d version %d != %d", *rank, *version, - protocol_version); + if (st->protocol_version == -1) + st->protocol_version = *version; + else if (st->protocol_version != *version) { + return error ("mvapich: rank %d version %d != %d", + *rank, *version, st->protocol_version); } return (0); } -static int mvapich_handle_task (int fd, struct mvapich_info *mvi) +static int mvapich_handle_task (mvapich_state_t *st, + int fd, struct mvapich_info *mvi) { mvi->fd = fd; - switch (protocol_version) { + switch (st->protocol_version) { case 1: case 2: case 3: - return mvapich_get_task_info (mvi); + return mvapich_get_task_info (st, mvi); case 5: case 6: - if (protocol_phase == 0) + if (st->protocol_phase == 0) return mvapich_get_hostid (mvi); else - return mvapich_get_task_info (mvi); + return mvapich_get_task_info (st, mvi); default: return (error ("mvapich: Unsupported protocol version %d", - protocol_version)); + st->protocol_version)); } return (0); @@ -299,16 +301,16 @@ static int mvapich_handle_task (int fd, struct mvapich_info *mvi) * total of 3*nprocs ints. * */ -static void mvapich_bcast_addrs (void) +static void mvapich_bcast_addrs (mvapich_state_t *st) { struct mvapich_info *m; - int out_addrs_len = 3 * nprocs * sizeof (int); + int out_addrs_len = 3 * st->nprocs * sizeof (int); int *out_addrs = xmalloc (out_addrs_len); int i = 0; int j = 0; - for (i = 0; i < nprocs; i++) { - m = mvarray[i]; + for (i = 0; i < st->nprocs; i++) { + m = st->mvarray[i]; /* * lids are found in addrs[rank] for each process */ @@ -317,29 +319,29 @@ static void mvapich_bcast_addrs (void) /* * hostids are the last entry in addrs */ - out_addrs[2 * nprocs + i] = + out_addrs[2 * st->nprocs + i] = m->addr[(m->addrlen/sizeof (int)) - 1]; } - for (i = 0; i < nprocs; i++) { - m = mvarray[i]; + for (i = 0; i < st->nprocs; i++) { + m = st->mvarray[i]; /* * qp array is tailored to each process. */ - for (j = 0; j < nprocs; j++) - out_addrs[nprocs + j] = - (i == j) ? -1 : mvarray[j]->addr[i]; + for (j = 0; j < st->nprocs; j++) + out_addrs[st->nprocs + j] = + (i == j) ? -1 : st->mvarray[j]->addr[i]; fd_write_n (m->fd, out_addrs, out_addrs_len); /* * Protocol version 3 requires pid list to be sent next */ - if (mvapich_requires_pids ()) { - for (j = 0; j < nprocs; j++) - fd_write_n (m->fd, &mvarray[j]->pid, - mvarray[j]->pidlen); + if (mvapich_requires_pids (st)) { + for (j = 0; j < st->nprocs; j++) + fd_write_n (m->fd, &st->mvarray[j]->pid, + st->mvarray[j]->pidlen); } } @@ -348,26 +350,26 @@ static void mvapich_bcast_addrs (void) return; } -static void mvapich_bcast_hostids (void) +static void mvapich_bcast_hostids (mvapich_state_t *st) { int * hostids; int i = 0; - size_t len = nprocs * sizeof (int); + size_t len = st->nprocs * sizeof (int); hostids = xmalloc (len); - for (i = 0; i < nprocs; i++) - hostids [i] = mvarray[i]->hostid; + for (i = 0; i < st->nprocs; i++) + hostids [i] = st->mvarray[i]->hostid; - for (i = 0; i < nprocs; i++) { - struct mvapich_info *mvi = mvarray [i]; + for (i = 0; i < st->nprocs; i++) { + struct mvapich_info *mvi = st->mvarray[i]; int co, rc; if (fd_write_n (mvi->fd, hostids, len) < 0) error ("mvapich: write hostid rank %d: %m", mvi->rank); if ((rc = fd_read_n (mvi->fd, &co, sizeof (int))) <= 0) { close (mvi->fd); - connect_once = 0; + st->connect_once = 0; } else mvi->do_poll = 1; } @@ -375,15 +377,15 @@ static void mvapich_bcast_hostids (void) xfree (hostids); } -static void mvapich_bcast (void) +static void mvapich_bcast (mvapich_state_t *st) { - if (!mvapich_dual_phase () || protocol_phase > 0) - return mvapich_bcast_addrs (); + if (!mvapich_dual_phase (st) || st->protocol_phase > 0) + return mvapich_bcast_addrs (st); else - return mvapich_bcast_hostids (); + return mvapich_bcast_hostids (st); } -static void mvapich_barrier (void) +static void mvapich_barrier (mvapich_state_t *st) { int i; struct mvapich_info *m; @@ -395,17 +397,17 @@ static void mvapich_barrier (void) debug ("mvapich: starting barrier"); - for (i = 0; i < nprocs; i++) { + for (i = 0; i < st->nprocs; i++) { int j; - m = mvarray[i]; + m = st->mvarray[i]; if (fd_read_n (m->fd, &j, sizeof (j)) == -1) error("mvapich read on barrier"); } debug ("mvapich: completed barrier for all tasks"); - for (i = 0; i < nprocs; i++) { - m = mvarray[i]; + for (i = 0; i < st->nprocs; i++) { + m = st->mvarray[i]; if (fd_write_n (m->fd, &i, sizeof (i)) == -1) error("mvapich: write on barrier: %m"); close (m->fd); @@ -416,13 +418,13 @@ static void mvapich_barrier (void) } static void -mvapich_print_abort_message (srun_job_t *job, int rank, int dest, - char *msg, int msglen) +mvapich_print_abort_message (mvapich_state_t *st, int rank, + int dest, char *msg, int msglen) { - slurm_step_layout_t *sl = job->step_layout; + slurm_step_layout_t *sl = st->job->step_layout; char *host; - if (!mvapich_abort_sends_rank ()) { + if (!mvapich_abort_sends_rank (st)) { info ("mvapich: Received ABORT message from an MPI process."); return; } @@ -447,7 +449,7 @@ mvapich_print_abort_message (srun_job_t *job, int rank, int dest, openlog ("srun", 0, LOG_USER); syslog (LOG_WARNING, "MVAPICH ABORT [jobid=%u.%u src=%d(%s) dst=%d(%s)]: %s", - job->jobid, job->stepid, rank, host, dest, dsthost, msg); + st->job->jobid, st->job->stepid, rank, host, dest, dsthost, msg); closelog(); } } @@ -459,7 +461,7 @@ mvapich_print_abort_message (srun_job_t *job, int rank, int dest, } -static void mvapich_wait_for_abort(srun_job_t *job) +static void mvapich_wait_for_abort(mvapich_state_t *st) { int src, dst; int ranks[2]; @@ -476,7 +478,7 @@ static void mvapich_wait_for_abort(srun_job_t *job) */ while (1) { slurm_addr addr; - int newfd = slurm_accept_msg_conn (mvapich_fd, &addr); + int newfd = slurm_accept_msg_conn (st->fd, &addr); if (newfd == -1) { fatal("MPI master failed to accept (abort-wait)"); @@ -509,73 +511,73 @@ static void mvapich_wait_for_abort(srun_job_t *job) close(newfd); - mvapich_print_abort_message (job, src, dst, msg, msglen); - fwd_signal(job, SIGKILL, opt.max_threads); + mvapich_print_abort_message (st, src, dst, msg, msglen); + slurm_signal_job_step(st->job->jobid, st->job->stepid, SIGKILL); } return; /* but not reached */ } -static void mvapich_mvarray_create (void) +static void mvapich_mvarray_create (mvapich_state_t *st) { int i; - mvarray = xmalloc (nprocs * sizeof (*mvarray)); - for (i = 0; i < nprocs; i++) { - mvarray [i] = mvapich_info_create (); - mvarray [i]->rank = i; + st->mvarray = xmalloc (st->nprocs * sizeof (*(st->mvarray))); + for (i = 0; i < st->nprocs; i++) { + st->mvarray [i] = mvapich_info_create (); + st->mvarray [i]->rank = i; } } -static void mvapich_mvarray_destroy (void) +static void mvapich_mvarray_destroy (mvapich_state_t *st) { int i; - for (i = 0; i < nprocs; i++) - mvapich_info_destroy (mvarray [i]); - xfree (mvarray); + for (i = 0; i < st->nprocs; i++) + mvapich_info_destroy (st->mvarray[i]); + xfree (st->mvarray); } -static int mvapich_rank_from_fd (int fd) +static int mvapich_rank_from_fd (mvapich_state_t *st, int fd) { int rank = 0; - while (mvarray[rank]->fd != fd) + while (st->mvarray[rank]->fd != fd) rank++; return (rank); } -static int mvapich_handle_connection (int fd) +static int mvapich_handle_connection (mvapich_state_t *st, int fd) { int version, rank; - if (protocol_phase == 0 || !connect_once) { - if (mvapich_get_task_header (fd, &version, &rank) < 0) + if (st->protocol_phase == 0 || !st->connect_once) { + if (mvapich_get_task_header (st, fd, &version, &rank) < 0) return (-1); - mvarray [rank]->rank = rank; + st->mvarray[rank]->rank = rank; - if (rank > nprocs - 1) + if (rank > st->nprocs - 1) return (error ("mvapich: task reported invalid rank (%d)", rank)); } else { - rank = mvapich_rank_from_fd (fd); + rank = mvapich_rank_from_fd (st, fd); } - if (mvapich_handle_task (fd, mvarray [rank]) < 0) + if (mvapich_handle_task (st, fd, st->mvarray[rank]) < 0) return (-1); return (0); } -static int poll_mvapich_fds (void) +static int poll_mvapich_fds (mvapich_state_t *st) { int i = 0; int j = 0; int rc; int fd; int nfds = 0; - struct pollfd *fds = xmalloc (nprocs * sizeof (struct pollfd)); + struct pollfd *fds = xmalloc (st->nprocs * sizeof (struct pollfd)); - for (i = 0; i < nprocs; i++) { - if (mvarray[i]->do_poll) { - fds[j].fd = mvarray[i]->fd; + for (i = 0; i < st->nprocs; i++) { + if (st->mvarray[i]->do_poll) { + fds[j].fd = st->mvarray[i]->fd; fds[j].events = POLLIN; j++; nfds++; @@ -583,8 +585,11 @@ static int poll_mvapich_fds (void) } mvapich_debug2 ("Going to poll %d fds", nfds); - if ((rc = poll (fds, nfds, -1)) < 0) - return (error ("mvapich: poll: %m")); + if ((rc = poll (fds, nfds, -1)) < 0) { + error ("mvapich: poll: %m"); + xfree (fds); + return SLURM_ERROR; + } i = 0; while (fds[i].revents != POLLIN) @@ -596,16 +601,16 @@ static int poll_mvapich_fds (void) return (fd); } -static int mvapich_get_next_connection (int listenfd) +static int mvapich_get_next_connection (mvapich_state_t *st) { slurm_addr addr; int fd; - if (connect_once && protocol_phase > 0) { - return (poll_mvapich_fds ()); + if (st->connect_once && st->protocol_phase > 0) { + return (poll_mvapich_fds (st)); } - if ((fd = slurm_accept_msg_conn (mvapich_fd, &addr)) < 0) { + if ((fd = slurm_accept_msg_conn (st->fd, &addr)) < 0) { error ("mvapich: accept: %m"); return (-1); } @@ -614,14 +619,14 @@ static int mvapich_get_next_connection (int listenfd) return (fd); } -static void do_timings (void) +static void do_timings (mvapich_state_t *st) { static int initialized = 0; static struct timeval initv = { 0, 0 }; struct timeval tv; struct timeval result; - if (!do_timing) + if (!st->do_timing) return; if (!initialized) { @@ -646,71 +651,72 @@ static void do_timings (void) static void *mvapich_thr(void *arg) { - srun_job_t *job = arg; + mvapich_state_t *st = arg; + mpi_plugin_client_info_t *job = st->job; int i = 0; int first = 1; debug ("mvapich-0.9.x/gen2: thread started: %ld", pthread_self ()); - mvapich_mvarray_create (); + mvapich_mvarray_create (st); again: i = 0; - while (i < nprocs) { + while (i < st->nprocs) { int fd; mvapich_debug ("Waiting to accept remote connection %d of %d\n", - i, nprocs); + i, st->nprocs); - if ((fd = mvapich_get_next_connection (mvapich_fd)) < 0) { + if ((fd = mvapich_get_next_connection (st)) < 0) { error ("mvapich: accept: %m"); goto fail; } if (first) { mvapich_debug ("first task checked in"); - do_timings (); + do_timings (st); first = 0; } - if (mvapich_handle_connection (fd) < 0) + if (mvapich_handle_connection (st, fd) < 0) goto fail; i++; } - mvapich_debug ("bcasting mvapich info to %d tasks", nprocs); - mvapich_bcast (); + mvapich_debug ("bcasting mvapich info to %d tasks", st->nprocs); + mvapich_bcast (st); - if (mvapich_dual_phase () && protocol_phase == 0) { - protocol_phase = 1; + if (mvapich_dual_phase (st) && st->protocol_phase == 0) { + st->protocol_phase = 1; goto again; } mvapich_debug ("calling mvapich_barrier"); - mvapich_barrier (); + mvapich_barrier (st); mvapich_debug ("all tasks have checked in"); - do_timings (); + do_timings (st); - mvapich_wait_for_abort (job); + mvapich_wait_for_abort (st); - mvapich_mvarray_destroy (); + mvapich_mvarray_destroy (st); return (NULL); fail: error ("mvapich: fatal error, killing job"); - fwd_signal (job, SIGKILL, opt.max_threads); + slurm_signal_job_step(job->jobid, job->stepid, SIGKILL); return (void *)0; } -static int process_environment (void) +static int process_environment (mvapich_state_t *st) { char *val; if (getenv ("MVAPICH_CONNECT_TWICE")) - connect_once = 0; + st->connect_once = 0; if ((val = getenv ("SLURM_MVAPICH_DEBUG"))) { int level = atoi (val); @@ -719,44 +725,92 @@ static int process_environment (void) } if (getenv ("SLURM_MVAPICH_TIMING")) - do_timing = 1; + st->do_timing = 1; return (0); } -extern int mvapich_thr_create(srun_job_t *job) +static mvapich_state_t * +mvapich_state_create(const mpi_plugin_client_info_t *job) { - short port; - pthread_attr_t attr; - pthread_t tid; + mvapich_state_t *state; + + state = (mvapich_state_t *)xmalloc(sizeof(mvapich_state_t)); - if (process_environment () < 0) - return error ("mvapich: Failed to read environment settings\n"); + state->tid = (pthread_t)-1; + state->mvarray = NULL; + state->fd = -1; + state->nprocs = job->step_layout->task_cnt; + state->protocol_version = -1; + state->protocol_phase = 0; + state->connect_once = 1; + state->do_timing = 0; - nprocs = opt.nprocs; + *(state->job) = *job; - if (net_stream_listen(&mvapich_fd, &port) < 0) - return error ("Unable to create ib listen port: %m"); + return state; +} + +static void mvapich_state_destroy(mvapich_state_t *st) +{ + xfree(st); +} + +extern mvapich_state_t *mvapich_thr_create(const mpi_plugin_client_info_t *job, + char ***env) +{ + short port; + pthread_attr_t attr; + mvapich_state_t *st = NULL; + + st = mvapich_state_create(job); + if (process_environment (st) < 0) { + error ("mvapich: Failed to read environment settings\n"); + mvapich_state_destroy(st); + return NULL; + } + if (net_stream_listen(&st->fd, &port) < 0) { + error ("Unable to create ib listen port: %m"); + mvapich_state_destroy(st); + return NULL; + } /* * Accept in a separate thread. */ slurm_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); - if (pthread_create(&tid, &attr, &mvapich_thr, (void *)job)) - return -1; + if (pthread_create(&st->tid, &attr, &mvapich_thr, (void *)st)) { + slurm_attr_destroy(&attr); + mvapich_state_destroy(st); + return NULL; + } + slurm_attr_destroy(&attr); /* * Set some environment variables in current env so they'll get * passed to all remote tasks */ - setenvf (NULL, "MPIRUN_PORT", "%d", port); - setenvf (NULL, "MPIRUN_NPROCS", "%d", nprocs); - setenvf (NULL, "MPIRUN_ID", "%d", job->jobid); - if (connect_once) - setenvf (NULL, "MPIRUN_CONNECT_ONCE", "1"); + env_array_overwrite_fmt(env, "MPIRUN_PORT", "%d", port); + env_array_overwrite_fmt(env, "MPIRUN_NPROCS", "%d", st->nprocs); + env_array_overwrite_fmt(env, "MPIRUN_ID", "%d", st->job->jobid); + if (st->connect_once) { + env_array_overwrite_fmt(env, "MPIRUN_CONNECT_ONCE", "1"); + } verbose ("mvapich-0.9.[45] master listening on port %d", port); - return 0; + return st; +} + +extern int mvapich_thr_destroy(mvapich_state_t *st) +{ + if (st != NULL) { + if (st->tid != (pthread_t)-1) { + pthread_cancel(st->tid); + pthread_join(st->tid, NULL); + } + mvapich_state_destroy(st); + } + return SLURM_SUCCESS; } diff --git a/src/plugins/mpi/mvapich/mvapich.h b/src/plugins/mpi/mvapich/mvapich.h index b6cde2c12e8dd0c88555ac55a53e520cf6689712..7a4f1ebd0c83bae0d44bc8b342dfeb5ff48d9f74 100644 --- a/src/plugins/mpi/mvapich/mvapich.h +++ b/src/plugins/mpi/mvapich/mvapich.h @@ -40,8 +40,12 @@ # include "config.h" #endif -#include "src/srun/srun_job.h" -#include "src/slurmd/slurmstepd/slurmstepd_job.h" +#include "src/common/slurm_xlator.h" +#include "src/common/mpi.h" #include "src/common/env.h" -extern int mvapich_thr_create(srun_job_t *job); +typedef struct mvapich_state mvapich_state_t; + +extern mvapich_state_t *mvapich_thr_create(const mpi_plugin_client_info_t *job, + char ***env); +extern int mvapich_thr_destroy(mvapich_state_t *state); diff --git a/src/plugins/mpi/none/mpi_none.c b/src/plugins/mpi/none/mpi_none.c index 80d4bffc4d58bb1269fe6156a52f4e4d4437c955..f9288a252aa88f97595806bb23a72107e416cb57 100644 --- a/src/plugins/mpi/none/mpi_none.c +++ b/src/plugins/mpi/none/mpi_none.c @@ -47,8 +47,7 @@ #include <slurm/slurm_errno.h> #include "src/common/slurm_xlator.h" -#include "src/srun/srun_job.h" -#include "src/slurmd/slurmstepd/slurmstepd_job.h" +#include "src/common/mpi.h" #include "src/common/env.h" /* @@ -82,24 +81,27 @@ const char plugin_name[] = "mpi none plugin"; const char plugin_type[] = "mpi/none"; const uint32_t plugin_version = 100; -int mpi_p_init(slurmd_job_t *job) +int p_mpi_hook_slurmstepd_task(const mpi_plugin_task_info_t*job, + char ***env) { debug("Using mpi/none"); return SLURM_SUCCESS; } -int mpi_p_thr_create(srun_job_t *job) +mpi_plugin_client_state_t * +p_mpi_hook_client_prelaunch(const mpi_plugin_client_info_t *job, char ***env) { debug("Using mpi/none"); - return SLURM_SUCCESS; + /* only return NULL on error */ + return (void *)0xdeadbeef; } -int mpi_p_single_task() +int p_mpi_hook_client_single_task_per_node() { return false; } -int mpi_p_exit() +int p_mpi_hook_client_fini(mpi_plugin_client_state_t *state) { return SLURM_SUCCESS; } diff --git a/src/plugins/mpi/openmpi/mpi_openmpi.c b/src/plugins/mpi/openmpi/mpi_openmpi.c index 3be86bdee60ab37d27ad699726680aa2bbdaa0ba..64fb70eddb08a66d96d563a293f2f9a672871f2a 100644 --- a/src/plugins/mpi/openmpi/mpi_openmpi.c +++ b/src/plugins/mpi/openmpi/mpi_openmpi.c @@ -47,8 +47,7 @@ #include <slurm/slurm_errno.h> #include "src/common/slurm_xlator.h" -#include "src/srun/srun_job.h" -#include "src/slurmd/slurmstepd/slurmstepd_job.h" +#include "src/common/mpi.h" #include "src/common/env.h" /* @@ -82,24 +81,27 @@ const char plugin_name[] = "OpenMPI plugin"; const char plugin_type[] = "mpi/openmpi"; const uint32_t plugin_version = 100; -int mpi_p_init(slurmd_job_t *job) +int p_mpi_hook_slurmstepd_task(const mpi_plugin_task_info_t *job, + char ***env) { debug("Using mpi/openmpi"); return SLURM_SUCCESS; } -int mpi_p_thr_create(srun_job_t *job) +mpi_plugin_client_state_t * +p_mpi_hook_client_prelaunch(const mpi_plugin_client_info_t *job, char ***env) { debug("Using mpi/openmpi"); - return SLURM_SUCCESS; + /* only return NULL on error */ + return (void *)0xdeadbeef; } -int mpi_p_single_task() +int p_mpi_hook_client_single_task_per_node() { return false; } -int mpi_p_exit() +int p_mpi_hook_client_fini() { return SLURM_SUCCESS; } diff --git a/src/slaunch/opt.c b/src/slaunch/opt.c index d013cf34fca83369a9b1a8c09fa25a448a8b538a..83658f41f5660ed313b3448f930820d696da8376 100644 --- a/src/slaunch/opt.c +++ b/src/slaunch/opt.c @@ -868,7 +868,7 @@ _process_env_var(env_vars_t *e, const char *val) break; case OPT_MPI: - if (srun_mpi_init((char *)val) == SLURM_ERROR) { + if (mpi_hook_client_init((char *)val) == SLURM_ERROR) { fatal("\"%s=%s\" -- invalid MPI type, " "--mpi=list for acceptable types.", e->var, val); @@ -1174,7 +1174,8 @@ void set_options(const int argc, char **argv) optarg); break; case LONG_OPT_MPI: - if (srun_mpi_init((char *)optarg) == SLURM_ERROR) { + if (mpi_hook_client_init((char *)optarg) + == SLURM_ERROR) { fatal("\"--mpi=%s\" -- long invalid MPI type, " "--mpi=list for acceptable types.", optarg); diff --git a/src/slurmd/slurmstepd/task.c b/src/slurmd/slurmstepd/task.c index 7141b380b51f4fcb24d6ec1906b1190edc367b63..417ac69b93add329752f36f080438d5f139b926c 100644 --- a/src/slurmd/slurmstepd/task.c +++ b/src/slurmd/slurmstepd/task.c @@ -258,6 +258,26 @@ _build_path(char* fname, char **prog_env) return file_path; } +static void +_setup_mpi(slurmd_job_t *job, int ltaskid) +{ + mpi_plugin_task_info_t info[1]; + + info->jobid = job->jobid; + info->stepid = job->stepid; + info->nnodes = job->nnodes; + info->nodeid = job->nodeid; + info->ntasks = job->nprocs; + info->ltasks = job->ntasks; + info->gtaskid = job->task[ltaskid]->gtid; + info->ltaskid = job->task[ltaskid]->id; + info->self = job->envtp->self; + info->client = job->envtp->cli; + + mpi_hook_slurmstepd_task(info, &job->env); +} + + /* * Current process is running as the user when this is called. */ @@ -327,7 +347,7 @@ exec_task(slurmd_job_t *job, int i, int waitfd) exit(1); } - slurmd_mpi_init (job, task->gtid); + _setup_mpi(job, i); pdebug_stop_current(job); } diff --git a/src/srun/launch.c b/src/srun/launch.c index de11da86f5122644f4fa793247bb1d67b5169414..94249c60e1c2b1770da4c5ea1037d2921c32f9ed 100644 --- a/src/srun/launch.c +++ b/src/srun/launch.c @@ -159,7 +159,7 @@ launch(void *arg) r.task_flags |= TASK_PARALLEL_DEBUG; /* Node specific message contents */ - if (slurm_mpi_single_task_per_node ()) { + if (mpi_hook_client_single_task_per_node ()) { for (i = 0; i < job->step_layout->node_cnt; i++) job->step_layout->tasks[i] = 1; } diff --git a/src/srun/msg.c b/src/srun/msg.c index 342a8aed64469fe14275a96ed1e8ce5bb5813fb4..43d6f8ee0b3f6db8d5185c70387e96e14248dc9e 100644 --- a/src/srun/msg.c +++ b/src/srun/msg.c @@ -860,7 +860,7 @@ _exit_handler(srun_job_t *job, slurm_msg_t *exit_msg) tasks_exited++; debug2("looking for %d got %d", opt.nprocs, tasks_exited); if ((tasks_exited == opt.nprocs) - || (slurm_mpi_single_task_per_node () + || (mpi_hook_client_single_task_per_node () && (tasks_exited == job->nhosts))) { debug2("All tasks exited"); update_job_state(job, SRUN_JOB_TERMINATED); diff --git a/src/srun/opt.c b/src/srun/opt.c index 05328f80226a55a68e1b3e6fc89eca4cd3f374a5..388e701393ccf094b991b0d798fc81359c634123 100644 --- a/src/srun/opt.c +++ b/src/srun/opt.c @@ -1209,7 +1209,7 @@ _process_env_var(env_vars_t *e, const char *val) break; case OPT_MPI: - if (srun_mpi_init((char *)val) == SLURM_ERROR) { + if (mpi_hook_client_init((char *)val) == SLURM_ERROR) { fatal("\"%s=%s\" -- invalid MPI type, " "--mpi=list for acceptable types.", e->var, val); @@ -1786,7 +1786,8 @@ void set_options(const int argc, char **argv, int first) } break; case LONG_OPT_MPI: - if (srun_mpi_init((char *)optarg) == SLURM_ERROR) { + if (mpi_hook_client_init((char *)optarg) + == SLURM_ERROR) { fatal("\"--mpi=%s\" -- long invalid MPI type, " "--mpi=list for acceptable types.", optarg); diff --git a/src/srun/srun.c b/src/srun/srun.c index 2bca734e686313c49718f45e784c7a2319096958..31ffcfad4baf550cb1e132cd1fb6397359870989 100644 --- a/src/srun/srun.c +++ b/src/srun/srun.c @@ -99,6 +99,7 @@ #define TYPE_TEXT 1 #define TYPE_SCRIPT 2 +mpi_plugin_client_info_t mpi_job_info[1]; /* * forward declaration of static funcs @@ -135,6 +136,8 @@ int srun(int ac, char **av) uint32_t job_id = 0; log_options_t logopt = LOG_OPTS_STDERR_ONLY; slurm_step_io_fds_t fds = SLURM_STEP_IO_FDS_INITIALIZER; + char **mpi_env = NULL; + mpi_plugin_client_state_t *mpi_state; env->stepid = -1; env->procid = -1; @@ -376,8 +379,13 @@ int srun(int ac, char **av) if (msg_thr_create(job) < 0) job_fatal(job, "Unable to create msg thread"); - if (slurm_mpi_thr_create(job) < 0) + mpi_job_info->jobid = job->jobid; + mpi_job_info->stepid = job->stepid; + mpi_job_info->step_layout = job->step_layout; + if (!(mpi_state = mpi_hook_client_prelaunch(mpi_job_info, &mpi_env))) job_fatal (job, "Failed to initialize MPI"); + env_array_set_environment(mpi_env); + env_array_free(mpi_env); srun_set_stdio_fds(job, &fds); job->client_io = client_io_handler_create(fds, @@ -457,7 +465,7 @@ int srun(int ac, char **av) debug("done"); - if (slurm_mpi_exit () < 0) + if (mpi_hook_client_fini (mpi_state) < 0) ; /* eh, ignore errors here */ _run_srun_epilog(job);