diff --git a/slurm/slurm.h.in b/slurm/slurm.h.in index f040a3c734a3946c6122b6be26ef99b8e6879d10..9f6e44ed0cb94fb6347c728702bf15d9a97b9641 100644 --- a/slurm/slurm.h.in +++ b/slurm/slurm.h.in @@ -317,9 +317,6 @@ enum node_states { /* Define keys for ctx_key argument of slurm_step_ctx_get() and * slurm_step_ctx_set() */ enum ctx_keys { - SLURM_STEP_CTX_ARGS, /* set argument count and values */ - SLURM_STEP_CTX_CHDIR, /* set directory to run from */ - SLURM_STEP_CTX_ENV, /* set environment variable count and values */ SLURM_STEP_CTX_STEPID, /* get the created job step id */ SLURM_STEP_CTX_TASKS, /* get array of task count on each node */ SLURM_STEP_CTX_TID, /* get array of task IDs for specified node */ @@ -328,7 +325,8 @@ enum ctx_keys { SLURM_STEP_CTX_SWITCH_JOB, SLURM_STEP_CTX_NUM_HOSTS, SLURM_STEP_CTX_HOST, - SLURM_STEP_CTX_JOBID + SLURM_STEP_CTX_JOBID, + SLURM_STEP_CTX_USER_MANAGED_SOCKETS }; /*****************************************************************************\ @@ -560,12 +558,17 @@ typedef struct { uint16_t envc; char **env; char *cwd; + bool user_managed_io; + + /* START - only used if user_managed_io is false */ bool buffered_stdio; bool labelio; char *remote_output_filename; char *remote_error_filename; char *remote_input_filename; slurm_step_io_fds_t local_fds; + /* END - only used if user_managed_io is false */ + uint32_t gid; bool multi_prog; uint32_t slurmd_debug; /* remote slurmd debug level */ @@ -773,8 +776,7 @@ typedef struct slurm_update_node_msg { typedef struct partition_info update_part_msg_t; -/* Opaque data type for slurm_step_ctx_*, slurm_step_launch, slurm_spawn, and - * slurm_spawn_kill functions */ +/* Opaque data type for slurm_step_ctx_* functions */ typedef struct slurm_step_ctx_struct *slurm_step_ctx; /*****************************************************************************\ @@ -1044,12 +1046,18 @@ extern int slurm_jobinfo_ctx_get PARAMS((switch_jobinfo_t jobinfo, int data_type, void *data)); /* - * slurm_step_ctx_set - set parameters in job step context. + * slurm_step_ctx_daemon_per_node_hack - Hack the step context + * to run a single process per node, regardless of the settings + * selected at slurm_step_ctx_create time. + * + * This is primarily used on AIX by the slurm_ll_api in support of + * poe. The slurm_ll_api will want to launch a single pmd daemon + * on each node regardless of the number of tasks running on each + * node. * IN ctx - job step context generated by slurm_step_ctx_create * RET SLURM_SUCCESS or SLURM_ERROR (with slurm_errno set) */ -extern int slurm_step_ctx_set PARAMS((slurm_step_ctx ctx, - int ctx_key, ...)); +extern int slurm_step_ctx_daemon_per_node_hack PARAMS((slurm_step_ctx ctx)); /* * slurm_step_ctx_destroy - free allocated memory for a job step context. @@ -1058,27 +1066,6 @@ extern int slurm_step_ctx_set PARAMS((slurm_step_ctx ctx, */ extern int slurm_step_ctx_destroy PARAMS((slurm_step_ctx ctx)); - -/* - * slurm_spawn - spawn tasks for the given job step context - * IN ctx - job step context generated by slurm_step_ctx_create - * IN fd_array - array of socket file descriptors to connect with - * stdin, stdout, and stderr of spawned task - * RET SLURM_SUCCESS or SLURM_ERROR (with slurm_errno set) - */ -extern int slurm_spawn PARAMS((slurm_step_ctx ctx, - int *fd_array)); - -/* - * slurm_spawn_kill - send the specified signal to an existing job step - * IN ctx - job step context generated by slurm_step_ctx_create - * IN signal - signal number - * RET SLURM_SUCCESS or SLURM_ERROR (with slurm_errno set) - */ -extern int slurm_spawn_kill PARAMS((slurm_step_ctx ctx, - uint16_t signal)); - - /* * slurm_job_step_launch_t_init - initialize a user-allocated * slurm_job_step_launch_t structure with default values. diff --git a/src/api/Makefile.am b/src/api/Makefile.am index c1c8a501d6c1cdddb654dd783aaf46c92c990967..585fd0c954404112c6dd4cfb30e48b7690a55068 100644 --- a/src/api/Makefile.am +++ b/src/api/Makefile.am @@ -61,7 +61,6 @@ slurmapi_src = \ partition_info.c \ signal.c \ slurm_pmi.c slurm_pmi.h \ - spawn.c \ step_ctx.c step_ctx.h \ step_io.c step_io.h \ step_launch.c step_launch.h \ diff --git a/src/api/Makefile.in b/src/api/Makefile.in index 50506b8355e2995c0eeaef0f202c2316649f5b1f..ba9238878fb68a4a4ae33314584f1fd189ce2e42 100644 --- a/src/api/Makefile.in +++ b/src/api/Makefile.in @@ -87,7 +87,7 @@ libslurm_la_DEPENDENCIES = $(am__DEPENDENCIES_1) am__objects_1 = allocate.lo cancel.lo checkpoint.lo complete.lo \ config_info.lo init_msg.lo job_info.lo job_step_info.lo \ node_info.lo node_select_info.lo partition_info.lo signal.lo \ - slurm_pmi.lo spawn.lo step_ctx.lo step_io.lo step_launch.lo \ + slurm_pmi.lo step_ctx.lo step_io.lo step_launch.lo \ pmi_server.lo submit.lo suspend.lo reconfigure.lo \ update_config.lo am_libslurm_la_OBJECTS = $(am__objects_1) @@ -347,7 +347,6 @@ slurmapi_src = \ partition_info.c \ signal.c \ slurm_pmi.c slurm_pmi.h \ - spawn.c \ step_ctx.c step_ctx.h \ step_io.c step_io.h \ step_launch.c step_launch.h \ @@ -488,7 +487,6 @@ distclean-compile: @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/reconfigure.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/signal.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/slurm_pmi.Plo@am__quote@ -@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/spawn.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/step_ctx.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/step_io.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/step_launch.Plo@am__quote@ diff --git a/src/api/spawn.c b/src/api/spawn.c deleted file mode 100644 index 2f737b9a76f4e954d7beca0a3384962e08a34faa..0000000000000000000000000000000000000000 --- a/src/api/spawn.c +++ /dev/null @@ -1,448 +0,0 @@ -/*****************************************************************************\ - * spawn.c - spawn task functions for use by AIX/POE - * - * $Id$ - ***************************************************************************** - * Copyright (C) 2004 The Regents of the University of California. - * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). - * Written by Morris Jette <jette1@llnl.gov>. - * UCRL-CODE-217948. - * - * This file is part of SLURM, a resource management program. - * For details, see <http://www.llnl.gov/linux/slurm/>. - * - * SLURM is free software; you can redistribute it and/or modify it under - * the terms of the GNU General Public License as published by the Free - * Software Foundation; either version 2 of the License, or (at your option) - * any later version. - * - * In addition, as a special exception, the copyright holders give permission - * to link the code of portions of this program with the OpenSSL library under - * certain conditions as described in each individual source file, and - * distribute linked combinations including the two. You must obey the GNU - * General Public License in all respects for all of the code used other than - * OpenSSL. If you modify file(s) with this exception, you may extend this - * exception to your version of the file(s), but you are not obligated to do - * so. If you do not wish to do so, delete this exception statement from your - * version. If you delete this exception statement from all source files in - * the program, then also delete it here. - * - * SLURM is distributed in the hope that it will be useful, but WITHOUT ANY - * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS - * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more - * details. - * - * You should have received a copy of the GNU General Public License along - * with SLURM; if not, write to the Free Software Foundation, Inc., - * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. -\*****************************************************************************/ - -#ifdef HAVE_CONFIG_H -# include "config.h" -#endif - -#include <errno.h> -#include <pthread.h> -#include <stdarg.h> -#include <stdlib.h> -#include <stdio.h> -#include <string.h> -#include <unistd.h> -#include <netinet/in.h> -#include <sys/param.h> -#include <sys/socket.h> -#include <sys/types.h> - -#include <slurm/slurm.h> - -#include "src/common/slurm_step_layout.h" -#include "src/common/hostlist.h" -#include "src/common/slurm_protocol_api.h" -#include "src/common/slurm_protocol_defs.h" -#include "src/common/read_config.h" -#include "src/common/xmalloc.h" -#include "src/common/xstring.h" - -#include "src/api/step_ctx.h" - -#define _DEBUG 0 -#define _MAX_THREAD_COUNT 50 - -extern char **environ; - -typedef enum {DSH_NEW, DSH_ACTIVE, DSH_DONE, DSH_FAILED} state_t; -typedef struct thd { - pthread_t thread; /* thread ID */ - pthread_attr_t attr; /* pthread attributes */ - state_t state; /* thread state */ - time_t tstart; /* time thread started */ - slurm_msg_t * req; /* the message to send */ -} thd_t; - -static pthread_mutex_t thread_mutex = PTHREAD_MUTEX_INITIALIZER; -static pthread_cond_t thread_cond = PTHREAD_COND_INITIALIZER; -static uint32_t threads_active = 0; /* currently active threads */ - -#if _DEBUG -static void _dump_ctx(slurm_step_ctx ctx); -#endif -static int _envcount(char **env); -static int _p_launch(slurm_msg_t *req, slurm_step_ctx ctx); -static int _sock_bind_wild(int sockfd); -static void * _thread_per_node_rpc(void *args); -static int _validate_ctx(slurm_step_ctx ctx); - -/* - * slurm_spawn - spawn tasks for the given job step context - * IN ctx - job step context generated by slurm_step_ctx_create - * IN fd_array - array of socket file descriptors to connect with - * stdin, stdout, and stderr of spawned task - * RET SLURM_SUCCESS or SLURM_ERROR (with slurm_errno set) - */ -extern int slurm_spawn (slurm_step_ctx ctx, int *fd_array) -{ - spawn_task_request_msg_t *msg_array_ptr; - int *sock_array; - slurm_msg_t *req_array_ptr; - int i, rc = SLURM_SUCCESS; - uint16_t slurmd_debug = 0; - char *env_var; - int task_cnt = 0; - uint32_t *cpus = NULL; - slurm_step_layout_t *step_layout = ctx->step_resp->step_layout; - hostlist_t hl = NULL; - char *name = NULL; - - if ((ctx == NULL) || - (ctx->magic != STEP_CTX_MAGIC) || - (fd_array == NULL)) { - slurm_seterrno(EINVAL); - return SLURM_ERROR; - } - - if (_validate_ctx(ctx)) - return SLURM_ERROR; - - /* get slurmd_debug level from SLURMD_DEBUG env var */ - env_var = getenv("SLURMD_DEBUG"); - if (env_var) { - i = atoi(env_var); - if (i >= 0) - slurmd_debug = i; - } - - /* validate fd_array and bind them to ports */ - sock_array = xmalloc(step_layout->node_cnt * sizeof(int)); - for (i=0; i<step_layout->node_cnt; i++) { - if (fd_array[i] < 0) { - slurm_seterrno(EINVAL); - free(sock_array); - return SLURM_ERROR; - } - sock_array[i] = _sock_bind_wild(fd_array[i]); - if (sock_array[i] < 0) { - slurm_seterrno(EINVAL); - free(sock_array); - return SLURM_ERROR; - } - listen(fd_array[i], 5); - task_cnt += step_layout->tasks[i]; - } - cpus = step_layout->tasks; - - msg_array_ptr = xmalloc(sizeof(spawn_task_request_msg_t) * - step_layout->node_cnt); - req_array_ptr = xmalloc(sizeof(slurm_msg_t) * - step_layout->node_cnt); - - hl = hostlist_create(step_layout->node_list); - for (i=0; i<step_layout->node_cnt; i++) { - spawn_task_request_msg_t *r = &msg_array_ptr[i]; - slurm_msg_t *m = &req_array_ptr[i]; - - /* Common message contents */ - r->job_id = ctx->job_id; - r->uid = ctx->user_id; - r->argc = ctx->argc; - r->argv = ctx->argv; - r->cred = ctx->step_resp->cred; - r->job_step_id = ctx->step_resp->job_step_id; - r->envc = ctx->envc; - r->env = ctx->env; - r->cwd = ctx->cwd; - r->nnodes = step_layout->node_cnt; - r->nprocs = task_cnt; - r->switch_job = ctx->step_resp->switch_job; - r->slurmd_debug = slurmd_debug; - /* Task specific message contents */ - r->global_task_id = step_layout->tids[i][0]; - r->cpus_allocated = cpus[i]; - r->complete_nodelist = xstrdup(step_layout->node_list); - r->io_port = sock_array[i]; - slurm_msg_t_init(m); - - m->msg_type = REQUEST_SPAWN_TASK; - m->data = r; - name = hostlist_shift(hl); - if(!name) { - error("hostlist incomplete for this job request"); - hostlist_destroy(hl); - return SLURM_ERROR; - } - if(slurm_conf_get_addr(name, &m->address) - == SLURM_ERROR) { - error("_init_task_layout: can't get addr for " - "host %s", name); - free(name); - hostlist_destroy(hl); - return SLURM_ERROR; - } - free(name); -#if _DEBUG - printf("tid=%d, fd=%d, port=%u, node_id=%u\n", - step_layout->tids[i][0], - fd_array[i], r->io_port, i); -#endif - } - hostlist_destroy(hl); - rc = _p_launch(req_array_ptr, ctx); - - xfree(msg_array_ptr); - xfree(req_array_ptr); - xfree(sock_array); - - return rc; -} - -/* - * slurm_spawn_kill - send the specified signal to an existing job step - * IN ctx - job step context generated by slurm_step_ctx_create - * IN signal - signal number - * RET SLURM_SUCCESS or SLURM_ERROR (with slurm_errno set) - */ -extern int -slurm_spawn_kill (slurm_step_ctx ctx, uint16_t signal) -{ - if ((ctx == NULL) || - (ctx->magic != STEP_CTX_MAGIC)) { - slurm_seterrno(EINVAL); - return SLURM_ERROR; - } - - return slurm_kill_job_step (ctx->job_id, - ctx->step_resp->job_step_id, signal); -} - - -/* - * Returns the port number in host byte order. - */ -static int _sock_bind_wild(int sockfd) -{ - socklen_t len; - struct sockaddr_in sin; - - memset(&sin, 0, sizeof(sin)); - sin.sin_family = AF_INET; - sin.sin_addr.s_addr = htonl(INADDR_ANY); - sin.sin_port = htons(0); /* bind ephemeral port */ - - if (bind(sockfd, (struct sockaddr *) &sin, sizeof(sin)) < 0) - return (-1); - len = sizeof(sin); - if (getsockname(sockfd, (struct sockaddr *) &sin, &len) < 0) - return (-1); - return ntohs(sin.sin_port); -} - - -/* validate the context of ctx, set default values as needed */ -static int _validate_ctx(slurm_step_ctx ctx) -{ - int rc = SLURM_SUCCESS; - - if (ctx->cwd == NULL) { - ctx->cwd = xmalloc(MAXPATHLEN); - if (ctx->cwd == NULL) { - slurm_seterrno(ENOMEM); - return SLURM_ERROR; - } - getcwd(ctx->cwd, MAXPATHLEN); - } - - if ((ctx->env_set == 0) && (ctx->envc == 0)) { - ctx->envc = _envcount(environ); - ctx->env = environ; - } - -#if _DEBUG - _dump_ctx(ctx); -#endif - return rc; -} - - -/* return number of elements in environment 'env' */ -static int _envcount(char **env) -{ - int envc = 0; - while (env[envc] != NULL) - envc++; - return (envc); -} - - -#if _DEBUG -/* dump the contents of a job step context */ -static void _dump_ctx(slurm_step_ctx ctx) -{ - int i, j; - - if ((ctx == NULL) || - (ctx->magic != STEP_CTX_MAGIC)) { - printf("Invalid _dump_ctx argument\n"); - return; - } - - printf("job_id = %u\n", ctx->job_id); - printf("user_id = %u\n", ctx->user_id); - printf("num_hosts = %u\n", ctx->num_hosts); - printf("num_tasks = %u\n", ctx->num_tasks); - printf("task_dist = %u\n", ctx->task_dist); - - printf("step_id = %u\n", ctx->step_resp->job_step_id); - printf("nodelist = %s\n", ctx->step_resp->node_list); - - printf("cws = %s\n", ctx->cwd); - - for (i=0; i<ctx->argc; i++) { - printf("argv[%d] = %s\n", i, ctx->argv[i]); - if (i > 5) { - printf("...\n"); - break; - } - } - - for (i=0; i<ctx->envc; i++) { - if (strlen(ctx->env[i]) > 50) - printf("env[%d] = %.50s...\n", i, ctx->env[i]); - else - printf("env[%d] = %s\n", i, ctx->env[i]); - if (i > 5) { - printf("...\n"); - break; - } - } - - for (i=0; i<ctx->step_resp->node_cnt; i++) { - printf("host=%s cpus=%u tasks=%u", - ctx->host[i], ctx->cpus[i], ctx->tasks[i]); - for (j=0; j<ctx->tasks[i]; j++) - printf(" tid[%d]=%u", j, ctx->tids[i][j]); - printf("\n"); - } - - printf("\n"); -} -#endif - - -/* parallel (multi-threaded) task launch, - * transmits all RPCs in parallel with timeout */ -static int _p_launch(slurm_msg_t *req, slurm_step_ctx ctx) -{ - int rc = SLURM_SUCCESS, i; - thd_t *thd; - slurm_step_layout_t *step_layout = ctx->step_resp->step_layout; - - thd = xmalloc(sizeof(thd_t) * step_layout->node_cnt); - if (thd == NULL) { - slurm_seterrno(ENOMEM); - return SLURM_ERROR; - } - - for (i=0; i<step_layout->node_cnt; i++) { - thd[i].state = DSH_NEW; - thd[i].req = &req[i]; - } - - /* start all the other threads (up to _MAX_THREAD_COUNT active) */ - for (i=0; i<step_layout->node_cnt; i++) { - /* wait until "room" for another thread */ - slurm_mutex_lock(&thread_mutex); - while (threads_active >= _MAX_THREAD_COUNT) { - pthread_cond_wait(&thread_cond, &thread_mutex); - } - - slurm_attr_init(&thd[i].attr); - (void) pthread_attr_setdetachstate(&thd[i].attr, - PTHREAD_CREATE_DETACHED); - while ((rc = pthread_create(&thd[i].thread, &thd[i].attr, - _thread_per_node_rpc, - (void *) &thd[i]))) { - if (threads_active) - pthread_cond_wait(&thread_cond, &thread_mutex); - else { - slurm_mutex_unlock(&thread_mutex); - sleep(1); - slurm_mutex_lock(&thread_mutex); - } - } - slurm_attr_destroy(&thd[i].attr); - - threads_active++; - slurm_mutex_unlock(&thread_mutex); - } - - /* wait for all tasks to terminate */ - slurm_mutex_lock(&thread_mutex); - for (i=0; i<step_layout->node_cnt; i++) { - while (thd[i].state < DSH_DONE) { - /* wait until another thread completes*/ - pthread_cond_wait(&thread_cond, &thread_mutex); - } - } - slurm_mutex_unlock(&thread_mutex); - - xfree(thd); - return rc; -} - - -/* - * _thread_per_node_rpc - thread to issue an RPC to a single node - * IN/OUT args - pointer to thd_t entry - */ -static void *_thread_per_node_rpc(void *args) -{ - int rc; - thd_t *thread_ptr = (thd_t *) args; - state_t new_state; - - thread_ptr->tstart = time(NULL); - thread_ptr->state = DSH_ACTIVE; - - if (slurm_send_recv_rc_msg_only_one(thread_ptr->req, &rc, 0) < 0) { - new_state = DSH_FAILED; - goto cleanup; - } - - switch (rc) { - case SLURM_SUCCESS: - new_state = DSH_DONE; - break; - default: - slurm_seterrno(rc); - new_state = DSH_FAILED; - } - - cleanup: - slurm_mutex_lock(&thread_mutex); - thread_ptr->state = new_state; - threads_active--; - /* Signal completion so another thread can replace us */ - slurm_mutex_unlock(&thread_mutex); - pthread_cond_signal(&thread_cond); - - return (void *) NULL; -} diff --git a/src/api/step_ctx.c b/src/api/step_ctx.c index abf401f8c7ebb01c2d35333f38827dcf14743665..201e289b01f9ddab5b273ecbd51ec5da5b9ab596 100644 --- a/src/api/step_ctx.c +++ b/src/api/step_ctx.c @@ -53,8 +53,6 @@ #include "src/api/step_ctx.h" -static void _xcopy_char_array(char ***argv_p, char **argv, int cnt); -static void _xfree_char_array(char ***argv_p, int cnt); static job_step_create_request_msg_t *_copy_step_req( job_step_create_request_msg_t *step_req); @@ -137,9 +135,10 @@ slurm_step_ctx_get (slurm_step_ctx ctx, int ctx_key, ...) job_step_create_response_msg_t ** step_resp_pptr; slurm_cred_t *cred; /* Slurm job credential */ switch_jobinfo_t *switch_job; + int *int_ptr; + int **int_array_pptr = (int **) NULL; - if ((ctx == NULL) || - (ctx->magic != STEP_CTX_MAGIC)) { + if ((ctx == NULL) || (ctx->magic != STEP_CTX_MAGIC)) { slurm_seterrno(EINVAL); return SLURM_ERROR; } @@ -201,6 +200,20 @@ slurm_step_ctx_get (slurm_step_ctx ctx, int ctx_key, ...) *char_array_pptr = nodelist_nth_host( ctx->step_resp->step_layout->node_list, node_inx); break; + case SLURM_STEP_CTX_USER_MANAGED_SOCKETS: + int_ptr = va_arg(ap, int *); + int_array_pptr = va_arg(ap, int **); + if (ctx->launch_state == NULL + || ctx->launch_state->user_managed_io == false + || ctx->launch_state->io.user == NULL) { + *int_ptr = 0; + *int_array_pptr = (int *)NULL; + rc = SLURM_ERROR; + break; + } + *int_ptr = ctx->launch_state->tasks_requested; + *int_array_pptr = ctx->launch_state->io.user->sockets; + break; default: slurm_seterrno(EINVAL); rc = SLURM_ERROR; @@ -228,64 +241,6 @@ slurm_jobinfo_ctx_get(switch_jobinfo_t jobinfo, int data_type, void *data) return switch_g_get_jobinfo(jobinfo, data_type, data); } -/* - * slurm_step_ctx_set - set parameters in job step context. - * IN ctx - job step context generated by slurm_step_ctx_create - * RET SLURM_SUCCESS or SLURM_ERROR (with slurm_errno set) - */ -extern int -slurm_step_ctx_set (slurm_step_ctx ctx, int ctx_key, ...) -{ - va_list ap; - int rc = SLURM_SUCCESS; - - if ((ctx == NULL) || - (ctx->magic != STEP_CTX_MAGIC)) { - slurm_seterrno(EINVAL); - return SLURM_ERROR; - } - - va_start(ap, ctx_key); - switch (ctx_key) { - case SLURM_STEP_CTX_ARGS: - if (ctx->argv) - _xfree_char_array(&ctx->argv, ctx->argc); - ctx->argc = va_arg(ap, int); - if ((ctx->argc < 1) || (ctx->argc > 1024)) { - slurm_seterrno(EINVAL); - break; - } - _xcopy_char_array(&ctx->argv, va_arg(ap, char **), - ctx->argc); - break; - - case SLURM_STEP_CTX_CHDIR: - if (ctx->cwd) - xfree(ctx->cwd); - ctx->cwd = xstrdup(va_arg(ap, char *)); - break; - - case SLURM_STEP_CTX_ENV: - ctx->env_set = 1; - if (ctx->env) - _xfree_char_array(&ctx->env, ctx->envc); - ctx->envc = va_arg(ap, int); - if ((ctx->envc < 1) || (ctx->envc > 1024)) { - slurm_seterrno(EINVAL); - break; - } - _xcopy_char_array(&ctx->env, va_arg(ap, char **), - ctx->envc); - break; - default: - slurm_seterrno(EINVAL); - rc = SLURM_ERROR; - } - va_end(ap); - - return rc; -} - /* * slurm_step_ctx_destroy - free allocated memory for a job step context. @@ -295,43 +250,67 @@ slurm_step_ctx_set (slurm_step_ctx ctx, int ctx_key, ...) extern int slurm_step_ctx_destroy (slurm_step_ctx ctx) { - if ((ctx == NULL) || - (ctx->magic != STEP_CTX_MAGIC)) { + if ((ctx == NULL) || (ctx->magic != STEP_CTX_MAGIC)) { slurm_seterrno(EINVAL); return SLURM_ERROR; } slurm_free_job_step_create_request_msg(ctx->step_req); slurm_free_job_step_create_response_msg(ctx->step_resp); - if (ctx->argv) - _xfree_char_array(&ctx->argv, ctx->argc); - if (ctx->env_set) - _xfree_char_array(&ctx->env, ctx->envc); - xfree(ctx->cwd); step_launch_state_destroy(ctx->launch_state); xfree(ctx); return SLURM_SUCCESS; } -/* xfree an array of character strings as created by _xcopy_char_array */ -static void _xfree_char_array(char ***argv_p, int cnt) +/* + * slurm_step_ctx_daemon_per_node_hack - Hack the step context + * to run a single process per node, regardless of the settings + * selected at slurm_step_ctx_create time. + * + * This is primarily used on AIX by the slurm_ll_api in support of + * poe. The slurm_ll_api will want to launch a single pmd daemon + * on each node regardless of the number of tasks running on each + * node. + * IN ctx - job step context generated by slurm_step_ctx_create + * RET SLURM_SUCCESS or SLURM_ERROR (with slurm_errno set) + */ +extern int +slurm_step_ctx_daemon_per_node_hack(slurm_step_ctx ctx) { - char **argv = *argv_p; + slurm_step_layout_t *new_layout, *old_layout; int i; - for (i=0; i<cnt; i++) - xfree(argv[i]); - xfree(*argv_p); -} -/* copy a character array, free with _xfree_char_array */ -static void _xcopy_char_array(char ***argv_p, char **argv, int cnt) -{ - int i; - char **tmp = xmalloc(sizeof(char *) * cnt); + if ((ctx == NULL) || (ctx->magic != STEP_CTX_MAGIC)) { + slurm_seterrno(EINVAL); + return SLURM_ERROR; + } - for (i=0; i<cnt; i++) - tmp[i] = xstrdup(argv[i]); + /* hack the context node count */ + ctx->step_req->num_tasks = ctx->step_req->node_count; + + /* hack the context step layout */ + old_layout = ctx->step_resp->step_layout; + new_layout = (slurm_step_layout_t *)xmalloc(sizeof(slurm_step_layout_t)); + new_layout->node_cnt = old_layout->node_cnt; + new_layout->task_cnt = old_layout->node_cnt; + new_layout->node_list = xstrdup(old_layout->node_list); + slurm_step_layout_destroy(old_layout); + new_layout->tasks = + (uint32_t *)xmalloc(sizeof(uint32_t) * new_layout->node_cnt); + new_layout->tids = + (uint32_t **)xmalloc(sizeof(uint32_t *) * new_layout->node_cnt); + for (i = 0; i < new_layout->node_cnt; i++) { + new_layout->tasks[i] = 1; + new_layout->tids[i] = (uint32_t *)xmalloc(sizeof(uint32_t)); + new_layout->tids[i][0] = i; + } + ctx->step_resp->step_layout = new_layout; - *argv_p = tmp; + /* recreate the launch state structure now that the settings + have changed */ + step_launch_state_destroy(ctx->launch_state); + ctx->launch_state = step_launch_state_create(ctx); + + return SLURM_SUCCESS; } static job_step_create_request_msg_t *_copy_step_req( diff --git a/src/api/step_ctx.h b/src/api/step_ctx.h index ba0e51c1c6ecf3a96f35019bffa7859b2412c112..c2ea66efc47ccad5a8f6ee58bb97ba698b950e73 100644 --- a/src/api/step_ctx.h +++ b/src/api/step_ctx.h @@ -1,7 +1,7 @@ /*****************************************************************************\ * step_ctx.h - step context declarations * - * $Id: spawn.c 8334 2006-06-07 20:36:04Z morrone $ + * $Id$ ***************************************************************************** * Copyright (C) 2006 The Regents of the University of California. * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). @@ -51,14 +51,7 @@ struct slurm_step_ctx_struct { job_step_create_request_msg_t *step_req; job_step_create_response_msg_t *step_resp; - char *cwd; /* working directory */ - uint32_t argc; /* count of arguments */ - char **argv; /* argument list */ - uint16_t env_set; /* flag if user set env */ - uint32_t envc; /* count of env vars */ - char **env; /* environment variables */ - - /* Used by slurm_step_launch(), but not slurm_spawn() */ + /* Used by slurm_step_launch() */ struct step_launch_state *launch_state; }; diff --git a/src/api/step_launch.c b/src/api/step_launch.c index 38b74321da4569c138979c847d75080ba652c05b..138aaf000d3704e2f87227ed85a1c6044847c4f1 100644 --- a/src/api/step_launch.c +++ b/src/api/step_launch.c @@ -1,7 +1,7 @@ /*****************************************************************************\ * step_launch.c - launch a parallel job step * - * $Id: spawn.c 7973 2006-05-08 23:52:35Z morrone $ + * $Id$ ***************************************************************************** * Copyright (C) 2006 The Regents of the University of California. * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). @@ -101,11 +101,13 @@ void slurm_job_step_launch_t_init (slurm_job_step_launch_t *ptr) { static slurm_step_io_fds_t fds = SLURM_STEP_IO_FDS_INITIALIZER; + memset(ptr, 0, sizeof(slurm_job_step_launch_t)); ptr->argc = 0; ptr->argv = NULL; ptr->envc = 0; ptr->env = NULL; ptr->cwd = NULL; + ptr->user_managed_io = false; ptr->buffered_stdio = true; ptr->labelio = false; ptr->remote_output_filename = NULL; @@ -118,6 +120,10 @@ void slurm_job_step_launch_t_init (slurm_job_step_launch_t *ptr) ptr->parallel_debug = false; ptr->task_prolog = NULL; ptr->task_epilog = NULL; + ptr->cpu_bind_type = 0; + ptr->cpu_bind = NULL; + ptr->mem_bind_type = 0; + ptr->mem_bind = NULL; } /* @@ -192,50 +198,51 @@ int slurm_step_launch (slurm_step_ctx ctx, launch.mem_bind_type = params->mem_bind_type; launch.mem_bind = params->mem_bind; launch.multi_prog = params->multi_prog ? 1 : 0; - launch.options = job_options_create(); launch.complete_nodelist = xstrdup(ctx->step_resp->step_layout->node_list); spank_set_remote_options (launch.options); - - launch.ofname = params->remote_output_filename; - launch.efname = params->remote_error_filename; - launch.ifname = params->remote_input_filename; - launch.buffered_stdio = params->buffered_stdio ? 1 : 0; - launch.task_flags = 0; if (params->parallel_debug) launch.task_flags |= TASK_PARALLEL_DEBUG; - /* Node specific message contents */ -/* if (slurm_mpi_single_task_per_node ()) { */ -/* for (i = 0; i < job->num_hosts; i++) */ -/* job->tasks[i] = 1; */ -/* } */ - launch.tasks_to_launch = ctx->step_resp->step_layout->tasks; launch.cpus_allocated = ctx->step_resp->step_layout->tasks; launch.global_task_ids = ctx->step_resp->step_layout->tids; - ctx->launch_state->client_io = - client_io_handler_create(params->local_fds, - ctx->step_req->num_tasks, - ctx->step_req->node_count, - ctx->step_resp->cred, - params->labelio); - if (ctx->launch_state->client_io == NULL) - return SLURM_ERROR; - if (client_io_handler_start(ctx->launch_state->client_io) - != SLURM_SUCCESS) - return SLURM_ERROR; - - launch.num_io_port = ctx->launch_state->client_io->num_listen; - launch.io_port = xmalloc(sizeof(uint16_t) * launch.num_io_port); - for (i = 0; i < launch.num_io_port; i++) { - launch.io_port[i] = - ctx->launch_state->client_io->listenport[i]; + launch.user_managed_io = params->user_managed_io ? 1 : 0; + ctx->launch_state->user_managed_io = params->user_managed_io; + if (!ctx->launch_state->user_managed_io) { + launch.ofname = params->remote_output_filename; + launch.efname = params->remote_error_filename; + launch.ifname = params->remote_input_filename; + launch.buffered_stdio = params->buffered_stdio ? 1 : 0; + ctx->launch_state->io.normal = + client_io_handler_create(params->local_fds, + ctx->step_req->num_tasks, + ctx->step_req->node_count, + ctx->step_resp->cred, + params->labelio); + if (ctx->launch_state->io.normal == NULL) + return SLURM_ERROR; + if (client_io_handler_start(ctx->launch_state->io.normal) + != SLURM_SUCCESS) + return SLURM_ERROR; + launch.num_io_port = ctx->launch_state->io.normal->num_listen; + launch.io_port = xmalloc(sizeof(uint16_t)*launch.num_io_port); + for (i = 0; i < launch.num_io_port; i++) { + launch.io_port[i] = + ctx->launch_state->io.normal->listenport[i]; + } + } else { /* user_managed_io is true */ + /* initialize user_managed_io_t */ + ctx->launch_state->io.user = + (user_managed_io_t *)xmalloc(sizeof(user_managed_io_t)); + ctx->launch_state->io.user->connected = 0; + ctx->launch_state->io.user->sockets = + (int *)xmalloc(sizeof(int)*ctx->step_req->num_tasks); } - + launch.num_resp_port = ctx->launch_state->num_resp_port; launch.resp_port = xmalloc(sizeof(uint16_t) * launch.num_resp_port); for (i = 0; i < launch.num_resp_port; i++) { @@ -268,6 +275,24 @@ int slurm_step_launch_wait_start(slurm_step_ctx ctx) } pthread_cond_wait(&sls->cond, &sls->lock); } + + if (sls->user_managed_io) { + while(sls->io.user->connected < sls->tasks_requested) { + if (sls->abort) { + if (!sls->abort_action_taken) { + slurm_kill_job_step( + ctx->job_id, + ctx->step_resp->job_step_id, + SIGKILL); + sls->abort_action_taken = true; + } + pthread_mutex_unlock(&sls->lock); + return 0; + } + pthread_cond_wait(&sls->cond, &sls->lock); + } + } + pthread_mutex_unlock(&sls->lock); return 1; } @@ -322,12 +347,14 @@ void slurm_step_launch_wait_finish(slurm_step_ctx ctx) ctx->job_id, ctx->step_resp->job_step_id, SIGKILL); - client_io_handler_abort(sls->client_io); + if (!sls->user_managed_io) + client_io_handler_abort(sls->io.normal); break; } else if (errnum != 0) { error("Error waiting on condition in" " slurm_step_launch_wait_finish: %m"); - client_io_handler_abort(sls->client_io); + if (!sls->user_managed_io) + client_io_handler_abort(sls->io.normal); break; } } @@ -339,8 +366,10 @@ void slurm_step_launch_wait_finish(slurm_step_ctx ctx) eio_handle_destroy(sls->msg_handle); /* Then wait for the IO thread to finish */ - client_io_handler_finish(sls->client_io); - client_io_handler_destroy(sls->client_io); + if (!sls->user_managed_io) { + client_io_handler_finish(sls->io.normal); + client_io_handler_destroy(sls->io.normal); + } pthread_mutex_unlock(&sls->lock); } @@ -585,14 +614,13 @@ _exit_handler(struct step_launch_state *sls, slurm_msg_t *exit_msg) pthread_mutex_unlock(&sls->lock); } -static void - /* * Take the list of node names of down nodes and convert into an * array of nodeids for the step. The nodeid array is passed to * client_io_handler_downnodes to notify the IO handler to expect no * further IO from that node. */ +static void _node_fail_handler(struct step_launch_state *sls, slurm_msg_t *fail_msg) { srun_node_fail_msg_t *nf = fail_msg->data; @@ -632,7 +660,10 @@ _node_fail_handler(struct step_launch_state *sls, slurm_msg_t *fail_msg) } } - client_io_handler_downnodes(sls->client_io, node_ids, num_node_ids); + if (!sls->user_managed_io) { + client_io_handler_downnodes(sls->io.normal, node_ids, + num_node_ids); + } pthread_cond_signal(&sls->cond); pthread_mutex_unlock(&sls->lock); @@ -642,6 +673,49 @@ _node_fail_handler(struct step_launch_state *sls, slurm_msg_t *fail_msg) hostset_destroy(all_nodes); } +/* + * The TCP connection that was used to send the task_spawn_io_msg_t message + * will be used as the user managed IO stream. The remote end of the TCP stream + * will be connected to the stdin, stdout, and stderr of the task. The + * local end of the stream is stored in the user_managed_io_t structure, and + * is left to the user to manage (the user can retrieve the array of + * socket descriptors using slurm_step_ctx_get()). + * + * To allow the message TCP stream to be reused for spawn IO traffic we + * set the slurm_msg_t's conn_fd to -1 to avoid having the caller close the + * TCP stream. + */ +static void +_task_user_managed_io_handler(struct step_launch_state *sls, + slurm_msg_t *user_io_msg) +{ + task_user_managed_io_msg_t *msg = + (task_user_managed_io_msg_t *) user_io_msg->data; + + pthread_mutex_lock(&sls->lock); + + debug("task %d user managed io stream established", msg->task_id); + /* sanity check */ + if (msg->task_id >= sls->tasks_requested) { + error("_task_user_managed_io_handler:" + " bad task ID %u (of %d tasks)", + msg->task_id, sls->tasks_requested); + } + + sls->io.user->connected++; + fd_set_blocking(user_io_msg->conn_fd); + sls->io.user->sockets[msg->task_id] = user_io_msg->conn_fd; + + /* prevent the caller from closing the user managed IO stream */ + user_io_msg->conn_fd = -1; + + pthread_cond_signal(&sls->cond); + pthread_mutex_unlock(&sls->lock); +} + +/* + * Identify the incoming message and call the appropriate handler function. + */ static void _handle_msg(struct step_launch_state *sls, slurm_msg_t *msg) { @@ -652,7 +726,7 @@ _handle_msg(struct step_launch_state *sls, slurm_msg_t *msg) if ((req_uid != slurm_uid) && (req_uid != 0) && (req_uid != uid)) { error ("Security violation, slurm message from uid %u", (unsigned int) req_uid); - return; + return; } switch (msg->msg_type) { @@ -692,6 +766,10 @@ _handle_msg(struct step_launch_state *sls, slurm_msg_t *msg) slurm_send_rc_msg(msg, rc); slurm_free_get_kvs_msg((kvs_get_msg_t *) msg->data); break; + case TASK_USER_MANAGED_IO_STREAM: + debug2("TASK_USER_MANAGED_IO_STREAM"); + _task_user_managed_io_handler(sls, msg); + break; default: error("received spurious message type: %d", msg->msg_type); diff --git a/src/api/step_launch.h b/src/api/step_launch.h index b95d277b08a09d715c1f2764d47f422926b84838..6cd7dfe34528ae6f2fbe82e260e40779692bb7e2 100644 --- a/src/api/step_launch.h +++ b/src/api/step_launch.h @@ -1,7 +1,7 @@ /*****************************************************************************\ * step_launch.h - launch a parallel job step * - * $Id: spawn.c 7973 2006-05-08 23:52:35Z morrone $ + * $Id$ ***************************************************************************** * Copyright (C) 2006 The Regents of the University of California. * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). @@ -44,6 +44,11 @@ #include "src/api/step_io.h" +typedef struct { + int connected; + int *sockets; /* array of socket file descriptors */ +} user_managed_io_t; + struct step_launch_state { pthread_mutex_t lock; pthread_cond_t cond; @@ -61,8 +66,12 @@ struct step_launch_state { uint16_t num_resp_port; uint16_t *resp_port; /* array of message response ports */ - /* client side io variables */ - client_io_t *client_io; + /* io variables */ + bool user_managed_io; + union { + client_io_t *normal; + user_managed_io_t *user; + } io; slurm_step_layout_t *layout; /* a pointer into the ctx step_resp, do not free */ diff --git a/src/common/slurm_protocol_defs.c b/src/common/slurm_protocol_defs.c index 44f7b5e71312b77c2e7531d7ed415e8542e33cbc..1ca0d9bf7f8d5bea9bd4bc01bbe0f578ddb2a100 100644 --- a/src/common/slurm_protocol_defs.c +++ b/src/common/slurm_protocol_defs.c @@ -423,32 +423,11 @@ void slurm_free_launch_tasks_request_msg(launch_tasks_request_msg_t * msg) xfree(msg); } -void slurm_free_spawn_task_request_msg(spawn_task_request_msg_t * msg) +void slurm_free_task_user_managed_io_stream_msg(task_user_managed_io_msg_t *msg) { - int i; if (msg == NULL) return; - slurm_cred_destroy(msg->cred); - - if (msg->env) { - for (i = 0; i < msg->envc; i++) { - xfree(msg->env[i]); - } - xfree(msg->env); - } - xfree(msg->cwd); - if (msg->argv) { - for (i = 0; i < msg->argc; i++) { - xfree(msg->argv[i]); - } - xfree(msg->argv); - } - - if (msg->switch_job) - switch_free_jobinfo(msg->switch_job); - xfree(msg->complete_nodelist); - xfree(msg); } @@ -1113,8 +1092,8 @@ extern int slurm_free_msg_data(slurm_msg_type_t type, void *data) case REQUEST_LAUNCH_TASKS: slurm_free_launch_tasks_request_msg(data); break; - case REQUEST_SPAWN_TASK: - slurm_free_spawn_task_request_msg(data); + case TASK_USER_MANAGED_IO_STREAM: + slurm_free_task_user_managed_io_stream_msg(data); break; case REQUEST_SIGNAL_TASKS: case REQUEST_TERMINATE_TASKS: diff --git a/src/common/slurm_protocol_defs.h b/src/common/slurm_protocol_defs.h index b1019d4f740e21da28613df3dbbbe7a8cf29b99e..edcad955025aa0556e49305c55392a6d01e4210e 100644 --- a/src/common/slurm_protocol_defs.h +++ b/src/common/slurm_protocol_defs.h @@ -176,8 +176,9 @@ typedef enum { REQUEST_SIGNAL_JOB, REQUEST_TERMINATE_JOB, MESSAGE_EPILOG_COMPLETE, - REQUEST_SPAWN_TASK, + DEFUNCT_REQUEST_SPAWN_TASK, /* DEFUNCT */ REQUEST_FILE_BCAST, + TASK_USER_MANAGED_IO_STREAM, SRUN_PING = 7001, SRUN_TIMEOUT, @@ -387,19 +388,23 @@ typedef struct launch_tasks_request_msg { char *mem_bind; /* binding map for tasks to memory */ uint16_t num_resp_port; uint16_t *resp_port; /* array of available response ports */ - uint16_t num_io_port; - uint16_t *io_port; /* array of available client IO listen ports */ uint16_t task_flags; uint32_t **global_task_ids; slurm_addr orig_addr; /* where message really came from for io */ - /* stdout/err/in per task filenames */ - char *ofname; - char *efname; - char *ifname; - /* buffered stdio flag: 1 for line-buffered, 0 for unbuffered */ - uint8_t buffered_stdio; + uint16_t user_managed_io; /* 0 for "normal" IO, + 1 for "user manged" IO */ + + /********** START "normal" IO only options **********/ + /* These options are ignored if user_managed_io is 1 */ + char *ofname; /* stdout filename pattern */ + char *efname; /* stderr filename pattern */ + char *ifname; /* stdin filename pattern */ + uint8_t buffered_stdio; /* 1 for line-buffered, 0 for unbuffered */ + uint16_t num_io_port; + uint16_t *io_port; /* array of available client IO listen ports */ + /********** END "normal" IO only options **********/ char *task_prolog; char *task_epilog; @@ -412,30 +417,9 @@ typedef struct launch_tasks_request_msg { char *complete_nodelist; } launch_tasks_request_msg_t; -typedef struct spawn_task_request_msg { - uint32_t job_id; - uint32_t job_step_id; - uint32_t nnodes; /* number of nodes in this job step */ - uint32_t nprocs; /* number of processes in this job step */ - uint32_t uid; - uint32_t gid; - uint16_t envc; - uint16_t argc; - uint16_t cpus_allocated; - uint16_t multi_prog; - char **env; - char **argv; - char *cwd; - uint16_t io_port; - uint16_t task_flags; - uint32_t global_task_id; - - uint32_t slurmd_debug; /* remote slurmd debug level */ - - slurm_cred_t cred; /* job credential */ - switch_jobinfo_t switch_job; /* switch credential for the job */ - char *complete_nodelist; -} spawn_task_request_msg_t; +typedef struct task_user_managed_io_msg { + uint32_t task_id; +} task_user_managed_io_msg_t; typedef struct partition_info partition_desc_msg_t; @@ -677,7 +661,8 @@ void inline slurm_free_launch_tasks_request_msg(launch_tasks_request_msg_t * msg); void inline slurm_free_launch_tasks_response_msg(launch_tasks_response_msg_t * msg); -void inline slurm_free_spawn_task_request_msg(spawn_task_request_msg_t * msg); +void inline slurm_free_task_user_managed_io_stream_msg( + task_user_managed_io_msg_t *msg); void inline slurm_free_task_exit_msg(task_exit_msg_t * msg); void inline slurm_free_kill_tasks_msg(kill_tasks_msg_t * msg); void inline diff --git a/src/common/slurm_protocol_pack.c b/src/common/slurm_protocol_pack.c index 45ce37b94f13c0bbc604f4a31574fab1f23fbb4c..d33e191dd717e23a51d717c4a737146a1c35eaa5 100644 --- a/src/common/slurm_protocol_pack.c +++ b/src/common/slurm_protocol_pack.c @@ -157,10 +157,10 @@ static int _unpack_launch_tasks_request_msg(launch_tasks_request_msg_t ** msg_ptr, Buf buffer); -static void _pack_spawn_task_request_msg(spawn_task_request_msg_t * - msg, Buf buffer); -static int _unpack_spawn_task_request_msg(spawn_task_request_msg_t ** - msg_ptr, Buf buffer); +static void _pack_task_user_managed_io_stream_msg(task_user_managed_io_msg_t * + msg, Buf buffer); +static int _unpack_task_user_managed_io_stream_msg(task_user_managed_io_msg_t ** + msg_ptr, Buf buffer); static void _pack_cancel_tasks_msg(kill_tasks_msg_t * msg, Buf buffer); static int _unpack_cancel_tasks_msg(kill_tasks_msg_t ** msg_ptr, Buf buffer); @@ -495,10 +495,9 @@ pack_msg(slurm_msg_t const *msg, Buf buffer) _pack_launch_tasks_response_msg((launch_tasks_response_msg_t *) msg->data, buffer); break; - case REQUEST_SPAWN_TASK: - _pack_spawn_task_request_msg( - (spawn_task_request_msg_t *) - msg->data, buffer); + case TASK_USER_MANAGED_IO_STREAM: + _pack_task_user_managed_io_stream_msg( + (task_user_managed_io_msg_t *) msg->data, buffer); break; case REQUEST_SIGNAL_TASKS: case REQUEST_TERMINATE_TASKS: @@ -778,10 +777,9 @@ unpack_msg(slurm_msg_t * msg, Buf buffer) (launch_tasks_response_msg_t **) & (msg->data), buffer); break; - case REQUEST_SPAWN_TASK: - rc = _unpack_spawn_task_request_msg( - (spawn_task_request_msg_t **) - & (msg->data), buffer); + case TASK_USER_MANAGED_IO_STREAM: + _unpack_task_user_managed_io_stream_msg( + (task_user_managed_io_msg_t **) &msg->data, buffer); break; case REQUEST_REATTACH_TASKS: rc = _unpack_reattach_tasks_request_msg( @@ -2680,9 +2678,6 @@ _pack_launch_tasks_request_msg(launch_tasks_request_msg_t * msg, Buf buffer) pack16((uint16_t)msg->num_resp_port, buffer); for(i = 0; i < msg->num_resp_port; i++) pack16((uint16_t)msg->resp_port[i], buffer); - pack16((uint16_t)msg->num_io_port, buffer); - for(i = 0; i < msg->num_io_port; i++) - pack16((uint16_t)msg->io_port[i], buffer); slurm_pack_slurm_addr(&msg->orig_addr, buffer); packstr_array(msg->env, msg->envc, buffer); packstr(msg->cwd, buffer); @@ -2693,17 +2688,22 @@ _pack_launch_tasks_request_msg(launch_tasks_request_msg_t * msg, Buf buffer) packstr_array(msg->argv, msg->argc, buffer); pack16((uint16_t)msg->task_flags, buffer); pack16((uint16_t)msg->multi_prog, buffer); - packstr(msg->ofname, buffer); - packstr(msg->efname, buffer); - packstr(msg->ifname, buffer); - pack8(msg->buffered_stdio, buffer); + pack16((uint16_t)msg->user_managed_io, buffer); + if (msg->user_managed_io == 0) { + packstr(msg->ofname, buffer); + packstr(msg->efname, buffer); + packstr(msg->ifname, buffer); + pack8(msg->buffered_stdio, buffer); + pack16((uint16_t)msg->num_io_port, buffer); + for(i = 0; i < msg->num_io_port; i++) + pack16((uint16_t)msg->io_port[i], buffer); + } packstr(msg->task_prolog, buffer); packstr(msg->task_epilog, buffer); pack32((uint32_t)msg->slurmd_debug, buffer); switch_pack_jobinfo(msg->switch_job, buffer); job_options_pack(msg->options, buffer); packstr(msg->complete_nodelist, buffer); - } static int @@ -2746,12 +2746,6 @@ _unpack_launch_tasks_request_msg(launch_tasks_request_msg_t ** for (i = 0; i < msg->num_resp_port; i++) safe_unpack16(&msg->resp_port[i], buffer); } - safe_unpack16(&msg->num_io_port, buffer); - if (msg->num_io_port > 0) { - msg->io_port = xmalloc(sizeof(uint16_t)*msg->num_io_port); - for (i = 0; i < msg->num_io_port; i++) - safe_unpack16(&msg->io_port[i], buffer); - } slurm_unpack_slurm_addr_no_alloc(&msg->orig_addr, buffer); safe_unpackstr_array(&msg->env, &msg->envc, buffer); safe_unpackstr_xmalloc(&msg->cwd, &uint16_tmp, buffer); @@ -2762,10 +2756,20 @@ _unpack_launch_tasks_request_msg(launch_tasks_request_msg_t ** safe_unpackstr_array(&msg->argv, &msg->argc, buffer); safe_unpack16(&msg->task_flags, buffer); safe_unpack16(&msg->multi_prog, buffer); - safe_unpackstr_xmalloc(&msg->ofname, &uint16_tmp, buffer); - safe_unpackstr_xmalloc(&msg->efname, &uint16_tmp, buffer); - safe_unpackstr_xmalloc(&msg->ifname, &uint16_tmp, buffer); - safe_unpack8(&msg->buffered_stdio, buffer); + safe_unpack16(&msg->user_managed_io, buffer); + if (msg->user_managed_io == 0) { + safe_unpackstr_xmalloc(&msg->ofname, &uint16_tmp, buffer); + safe_unpackstr_xmalloc(&msg->efname, &uint16_tmp, buffer); + safe_unpackstr_xmalloc(&msg->ifname, &uint16_tmp, buffer); + safe_unpack8(&msg->buffered_stdio, buffer); + safe_unpack16(&msg->num_io_port, buffer); + if (msg->num_io_port > 0) { + msg->io_port = + xmalloc(sizeof(uint16_t) * msg->num_io_port); + for (i = 0; i < msg->num_io_port; i++) + safe_unpack16(&msg->io_port[i], buffer); + } + } safe_unpackstr_xmalloc(&msg->task_prolog, &uint16_tmp, buffer); safe_unpackstr_xmalloc(&msg->task_epilog, &uint16_tmp, buffer); safe_unpack32(&msg->slurmd_debug, buffer); @@ -2791,71 +2795,29 @@ unpack_error: } static void -_pack_spawn_task_request_msg(spawn_task_request_msg_t * msg, Buf buffer) +_pack_task_user_managed_io_stream_msg(task_user_managed_io_msg_t * msg, + Buf buffer) { xassert(msg != NULL); - pack32((uint32_t)msg->job_id, buffer); - pack32((uint32_t)msg->job_step_id, buffer); - pack32((uint32_t)msg->nnodes, buffer); - pack32((uint32_t)msg->nprocs, buffer); - pack32((uint32_t)msg->uid, buffer); - pack32((uint32_t)msg->gid, buffer); - slurm_cred_pack(msg->cred, buffer); - packstr_array(msg->env, msg->envc, buffer); - packstr(msg->cwd, buffer); - packstr_array(msg->argv, msg->argc, buffer); - pack16((uint16_t)msg->io_port, buffer); - pack16((uint16_t)msg->task_flags, buffer); - pack16((uint16_t)msg->cpus_allocated, buffer); - pack16((uint16_t)msg->multi_prog, buffer); - pack32((uint32_t)msg->slurmd_debug, buffer); - pack32((uint32_t)msg->global_task_id, buffer); - switch_pack_jobinfo(msg->switch_job, buffer); - packstr(msg->complete_nodelist, buffer); + pack32(msg->task_id, buffer); } static int -_unpack_spawn_task_request_msg(spawn_task_request_msg_t ** - msg_ptr, Buf buffer) +_unpack_task_user_managed_io_stream_msg(task_user_managed_io_msg_t **msg_ptr, + Buf buffer) { - uint16_t uint16_tmp; - spawn_task_request_msg_t *msg; + task_user_managed_io_msg_t *msg; xassert(msg_ptr != NULL); - msg = xmalloc(sizeof(spawn_task_request_msg_t)); + msg = xmalloc(sizeof(task_user_managed_io_msg_t)); *msg_ptr = msg; - safe_unpack32(&msg->job_id, buffer); - safe_unpack32(&msg->job_step_id, buffer); - safe_unpack32(&msg->nnodes, buffer); - safe_unpack32(&msg->nprocs, buffer); - safe_unpack32(&msg->uid, buffer); - safe_unpack32(&msg->gid, buffer); - if (!(msg->cred = slurm_cred_unpack(buffer))) - goto unpack_error; - safe_unpackstr_array(&msg->env, &msg->envc, buffer); - safe_unpackstr_xmalloc(&msg->cwd, &uint16_tmp, buffer); - safe_unpackstr_array(&msg->argv, &msg->argc, buffer); - safe_unpack16(&msg->io_port, buffer); - safe_unpack16(&msg->task_flags, buffer); - safe_unpack16(&msg->cpus_allocated, buffer); - safe_unpack16(&msg->multi_prog, buffer); - safe_unpack32(&msg->slurmd_debug, buffer); - safe_unpack32(&msg->global_task_id, buffer); - - switch_alloc_jobinfo(&msg->switch_job); - if (switch_unpack_jobinfo(msg->switch_job, buffer) < 0) { - error("switch_unpack_jobinfo: %m"); - switch_free_jobinfo(msg->switch_job); - goto unpack_error; - } - - safe_unpackstr_xmalloc(&msg->complete_nodelist, &uint16_tmp, buffer); + safe_unpack32(&msg->task_id, buffer); return SLURM_SUCCESS; unpack_error: - slurm_free_spawn_task_request_msg(msg); + slurm_free_task_user_managed_io_stream_msg(msg); *msg_ptr = NULL; return SLURM_ERROR; } diff --git a/src/slurmd/common/slurmstepd_init.h b/src/slurmd/common/slurmstepd_init.h index 2636e4448c6834687b7d12d309ac63a04585d36c..430befa669b61d3e5014ddbe4bbc6f5c77ee9c7c 100644 --- a/src/slurmd/common/slurmstepd_init.h +++ b/src/slurmd/common/slurmstepd_init.h @@ -50,7 +50,7 @@ typedef enum slurmd_step_tupe { LAUNCH_BATCH_JOB = 0, LAUNCH_TASKS, - SPAWN_TASKS + DEFUNCT_SPAWN_TASKS /* DEFUNCT */ } slurmd_step_type_t; /* diff --git a/src/slurmd/slurmd/req.c b/src/slurmd/slurmd/req.c index 96cdd6ea82082177d1b913294758a9d07eed3b1d..c1ca177e03900c603279d970c9aeeda1238a9128 100644 --- a/src/slurmd/slurmd/req.c +++ b/src/slurmd/slurmd/req.c @@ -100,7 +100,6 @@ static bool _job_still_running(uint32_t job_id); static int _kill_all_active_steps(uint32_t jobid, int sig, bool batch); static int _terminate_all_steps(uint32_t jobid, bool batch); static void _rpc_launch_tasks(slurm_msg_t *); -static void _rpc_spawn_task(slurm_msg_t *); static void _rpc_batch_job(slurm_msg_t *); static void _rpc_signal_tasks(slurm_msg_t *); static void _rpc_terminate_tasks(slurm_msg_t *); @@ -159,13 +158,6 @@ slurmd_req(slurm_msg_t *msg) slurm_free_launch_tasks_request_msg(msg->data); slurm_mutex_unlock(&launch_mutex); break; - case REQUEST_SPAWN_TASK: - debug2("Processing RPC: REQUEST_SPAWN_TASK"); - slurm_mutex_lock(&launch_mutex); - _rpc_spawn_task(msg); - slurm_free_spawn_task_request_msg(msg->data); - slurm_mutex_unlock(&launch_mutex); - break; case REQUEST_SIGNAL_TASKS: debug2("Processing RPC: REQUEST_SIGNAL_TASKS"); _rpc_signal_tasks(msg); @@ -376,15 +368,6 @@ _send_slurmstepd_init(int fd, slurmd_step_type_t type, void *req, uid = (uid_t)((launch_tasks_request_msg_t *)req)->uid; msg.msg_type = REQUEST_LAUNCH_TASKS; break; - case SPAWN_TASKS: - /* - * The validity of req->uid was verified against the - * auth credential in _rpc_spawn_task(). req->gid - * has NOT yet been checked! - */ - uid = (uid_t)((spawn_task_request_msg_t *)req)->uid; - msg.msg_type = REQUEST_SPAWN_TASK; - break; default: error("Was sent a task I didn't understand"); break; @@ -735,91 +718,6 @@ _rpc_launch_tasks(slurm_msg_t *msg) send_registration_msg(errnum, false); } - -static void -_rpc_spawn_task(slurm_msg_t *msg) -{ - int errnum = 0; - uint16_t port; - char host[MAXHOSTNAMELEN]; - uid_t req_uid; - spawn_task_request_msg_t *req = msg->data; - uint32_t jobid = req->job_id; - uint32_t stepid = req->job_step_id; - bool super_user = false; - slurm_addr self; - slurm_addr *cli = &msg->orig_addr; - socklen_t adlen; - int spawn_tasks_to_launch = -1; - hostset_t step_hset = NULL; - - req_uid = g_slurm_auth_get_uid(msg->auth_cred); - - super_user = _slurm_authorized_user(req_uid); - - if ((super_user == false) && (req_uid != req->uid)) { - error("spawn task request from uid %u", - (unsigned int) req_uid); - errnum = ESLURM_USER_ID_MISSING; /* or invalid user */ - goto done; - } - - slurm_get_ip_str(cli, &port, host, sizeof(host)); - info("spawn task %u.%u request from %u@%s", req->job_id, - req->job_step_id, req->uid, host); - - if (_check_job_credential(req->cred, jobid, stepid, req_uid, - spawn_tasks_to_launch, &step_hset) < 0) { - errnum = ESLURMD_INVALID_JOB_CREDENTIAL; - error("Invalid job credential from %ld@%s: %m", - (long) req_uid, host); - goto done; - } - if (slurm_cred_revoked(conf->vctx, req->cred)) { - info("Job credential revoked for %u", jobid); - errnum = ESLURMD_CREDENTIAL_REVOKED; - goto done; - } - -#ifndef HAVE_FRONT_END - if (!slurm_cred_jobid_cached(conf->vctx, req->job_id)) { - slurm_cred_insert_jobid(conf->vctx, req->job_id); - if (_run_prolog(req->job_id, req->uid, NULL) != 0) { - error("[job %u] prolog failed", req->job_id); - errnum = ESLURMD_PROLOG_FAILED; - goto done; - } - } -#endif - - adlen = sizeof(self); - _slurm_getsockname(msg->conn_fd, (struct sockaddr *)&self, &adlen); - - errnum = _forkexec_slurmstepd(SPAWN_TASKS, (void *)req, cli, &self, - step_hset); - - done: - if (step_hset) - hostset_destroy(step_hset); - if (slurm_send_rc_msg(msg, errnum) < 0) { - - error("spawn_task: unable to send return code: %m"); - - /* - * Rewind credential so that srun may perform retry - */ - slurm_cred_rewind(conf->vctx, req->cred); /* ignore errors */ - - } else if (errnum == SLURM_SUCCESS) - save_cred_state(conf->vctx); - - /* - * If job prolog failed, indicate failure to slurmctld - */ - if (errnum == ESLURMD_PROLOG_FAILED) - send_registration_msg(errnum, false); -} - static void _prolog_error(batch_job_launch_msg_t *req, int rc) { diff --git a/src/slurmd/slurmstepd/io.c b/src/slurmd/slurmstepd/io.c index a6bc7a2f0dd8402d3295b5cd160d4fb4fb8d6be7..f6a175227fa02a65871c3a47a2bbce4cb0cdfe20 100644 --- a/src/slurmd/slurmstepd/io.c +++ b/src/slurmd/slurmstepd/io.c @@ -1311,3 +1311,58 @@ _outgoing_buf_free(slurmd_job_t *job) return false; } + +/********************************************************************** + * Functions specific to "user managed" IO + **********************************************************************/ +static int +_user_managed_io_connect(srun_info_t *srun, uint32_t gtid) +{ + int fd; + task_user_managed_io_msg_t user_io_msg; + slurm_msg_t msg; + + slurm_msg_t_init(&msg); + msg.msg_type = TASK_USER_MANAGED_IO_STREAM; + msg.data = &user_io_msg; + user_io_msg.task_id = gtid; + + fd = slurm_open_msg_conn(&srun->resp_addr); + if (fd == -1) + return -1; + + if (slurm_send_node_msg(fd, &msg) == -1) { + close(fd); + return -1; + } + return fd; +} + +/* + * This function sets the close-on-exec flag on the socket descriptor. + * io_dup_stdio will will remove the close-on-exec flag for just one task's + * file descriptors. + */ +int +user_managed_io_client_connect(int ntasks, srun_info_t *srun, + slurmd_task_info_t **tasks) +{ + int fd; + int i; + + for (i = 0; i < ntasks; i++) { + fd = _user_managed_io_connect(srun, tasks[i]->gtid); + if (fd == -1) + return SLURM_ERROR; + fd_set_close_on_exec(fd); + tasks[i]->stdin_fd = fd; + tasks[i]->to_stdin = -1; + tasks[i]->stdout_fd = fd; + tasks[i]->from_stdout = -1; + tasks[i]->stderr_fd = fd; + tasks[i]->from_stderr = -1; + } + + return SLURM_SUCCESS; +} + diff --git a/src/slurmd/slurmstepd/io.h b/src/slurmd/slurmstepd/io.h index 11436051fc55763e3f402bf6357a2d042caf7c1f..1eb69cde30efbc6de844f32e6898e00fb111c2b8 100644 --- a/src/slurmd/slurmstepd/io.h +++ b/src/slurmd/slurmstepd/io.h @@ -101,4 +101,11 @@ void io_close_task_fds(slurmd_job_t *job); void io_close_all(slurmd_job_t *job); +/* + * Initialize "user managed" IO, where each task has a single TCP + * socket end point shared on stdin, stdout, and stderr. + */ +int user_managed_io_client_connect(int ntasks, srun_info_t *srun, + slurmd_task_info_t **tasks); + #endif /* !_IO_H */ diff --git a/src/slurmd/slurmstepd/mgr.c b/src/slurmd/slurmstepd/mgr.c index e98bb10b141d6edfc8824424dce9cd3b2c44cad0..888aab2bb3164077dee6d2e793d9dc4fabfabcbe 100644 --- a/src/slurmd/slurmstepd/mgr.c +++ b/src/slurmd/slurmstepd/mgr.c @@ -155,8 +155,7 @@ static int _fork_all_tasks(slurmd_job_t *job); static int _become_user(slurmd_job_t *job, struct priv_state *ps); static void _set_prio_process (slurmd_job_t *job); static void _set_job_log_prefix(slurmd_job_t *job); -static int _setup_io(slurmd_job_t *job); -static int _setup_spawn_io(slurmd_job_t *job); +static int _setup_normal_io(slurmd_job_t *job); static int _drop_privileges(slurmd_job_t *job, bool do_setuid, struct priv_state *state); static int _reclaim_privileges(struct priv_state *state); @@ -201,7 +200,7 @@ mgr_launch_tasks_setup(launch_tasks_request_msg_t *msg, slurm_addr *cli, { slurmd_job_t *job = NULL; - if (!(job = job_create(msg, cli))) { + if (!(job = job_create(msg))) { _send_launch_failure (msg, cli, errno); return NULL; } @@ -299,29 +298,6 @@ mgr_launch_batch_job_cleanup(slurmd_job_t *job, int rc) _batch_cleanup(job, 2, rc); } -/* - * Spawn a task / job step on the current node - */ -slurmd_job_t * -mgr_spawn_task_setup(spawn_task_request_msg_t *msg, slurm_addr *cli, - slurm_addr *self) -{ - slurmd_job_t *job = NULL; - - if (!(job = job_spawn_create(msg, cli))) - return NULL; - - job->spawn_task = true; - _set_job_log_prefix(job); - - _setargs(job); - - job->envtp->cli = cli; - job->envtp->self = self; - - return job; -} - static void _set_job_log_prefix(slurmd_job_t *job) { @@ -339,13 +315,12 @@ _set_job_log_prefix(slurmd_job_t *job) } static int -_setup_io(slurmd_job_t *job) +_setup_normal_io(slurmd_job_t *job) { int rc = 0; struct priv_state sprivs; - debug2("Entering _setup_io"); - + debug2("Entering _setup_normal_io"); /* * Temporarily drop permissions, initialize task stdio file @@ -377,39 +352,23 @@ _setup_io(slurmd_job_t *job) if (!job->batch) if (io_thread_start(job) < 0) return ESLURMD_IO_ERROR; - /* - * Initialize log facility to copy errors back to srun - */ - _slurmd_job_log_init(job); - -#ifndef NDEBUG -# ifdef PR_SET_DUMPABLE - if (prctl(PR_SET_DUMPABLE, 1) < 0) - debug ("Unable to set dumpable to 1"); -# endif /* PR_SET_DUMPABLE */ -#endif /* !NDEBUG */ - - debug2("Leaving _setup_io"); + debug2("Leaving _setup_normal_io"); return SLURM_SUCCESS; } - static int -_setup_spawn_io(slurmd_job_t *job) +_setup_user_managed_io(slurmd_job_t *job) { - _slurmd_job_log_init(job); + srun_info_t *srun; -#ifndef NDEBUG -# ifdef PR_SET_DUMPABLE - if (prctl(PR_SET_DUMPABLE, 1) < 0) - debug ("Unable to set dumpable to 1"); -# endif /* PR_SET_DUMPABLE */ -#endif /* !NDEBUG */ - - return SLURM_SUCCESS; + if ((srun = list_peek(job->sruns)) == NULL) { + error("_setup_user_managed_io: no clients!"); + return SLURM_ERROR; + } + + return user_managed_io_client_connect(job->ntasks, srun, job->task); } - static void _random_sleep(slurmd_job_t *job) { @@ -687,10 +646,22 @@ job_manager(slurmd_job_t *job) goto fail1; } - if (job->spawn_task) - rc = _setup_spawn_io(job); + if (job->user_managed_io) + rc = _setup_user_managed_io(job); else - rc = _setup_io(job); + rc = _setup_normal_io(job); + /* + * Initialize log facility to copy errors back to srun + */ + _slurmd_job_log_init(job); + +#ifndef NDEBUG +# ifdef PR_SET_DUMPABLE + if (prctl(PR_SET_DUMPABLE, 1) < 0) + debug ("Unable to set dumpable to 1"); +# endif /* PR_SET_DUMPABLE */ +#endif /* !NDEBUG */ + if (rc) { error("IO setup failed: %m"); goto fail2; @@ -758,7 +729,7 @@ job_manager(slurmd_job_t *job) /* * Wait for io thread to complete (if there is one) */ - if (!job->batch && !job->spawn_task && io_initialized) { + if (!job->batch && !job->user_managed_io && io_initialized) { eio_signal_shutdown(job->eio); _wait_for_io(job); } @@ -1176,7 +1147,7 @@ _wait_for_all_tasks(slurmd_job_t *job) * Make sure all processes in session are dead for interactive jobs. On * systems with an IBM Federation switch, all processes must be terminated * before the switch window can be released by interconnect_postfini(). - * For batch jobs, we let spawned processes continue by convention + * For batch jobs, we let spawned processes continue by convention * (although this could go either way). The Epilog program could be used * to terminate any "orphan" processes. */ @@ -1363,7 +1334,7 @@ _send_launch_resp(slurmd_job_t *job, int rc) launch_tasks_response_msg_t resp; srun_info_t *srun = list_peek(job->sruns); - if (job->batch || job->spawn_task) + if (job->batch) return; debug("Sending launch resp rc=%d", rc); @@ -1504,8 +1475,7 @@ _slurmd_job_log_init(slurmd_job_t *job) { char argv0[64]; - if (!job->spawn_task) - conf->log_opts.buffered = 1; + conf->log_opts.buffered = 1; /* * Reset stderr logging to user requested level @@ -1529,7 +1499,7 @@ _slurmd_job_log_init(slurmd_job_t *job) log_set_argv0(argv0); /* Connect slurmd stderr to job's stderr */ - if ((!job->spawn_task) && (job->task != NULL)) { + if (!job->user_managed_io && job->task != NULL) { if (dup2(job->task[0]->stderr_fd, STDERR_FILENO) < 0) { error("job_log_init: dup2(stderr): %m"); return; diff --git a/src/slurmd/slurmstepd/mgr.h b/src/slurmd/slurmstepd/mgr.h index bcb4c859af00a6f85a0b165ccc8c7860a2c8bb5e..2dd4f39eca3cd67e7b1e9254418f7381a555ab72 100644 --- a/src/slurmd/slurmstepd/mgr.h +++ b/src/slurmd/slurmstepd/mgr.h @@ -46,12 +46,6 @@ #include "src/slurmd/slurmd/slurmd.h" #include "src/slurmd/slurmstepd/slurmstepd_job.h" -/* - * Initialize a slurmd_job_t structure for a spawn task - */ -slurmd_job_t *mgr_spawn_task_setup(spawn_task_request_msg_t *msg, - slurm_addr *client, slurm_addr *self); - /* * Initialize a slurmd_job_t structure for a launch tasks */ diff --git a/src/slurmd/slurmstepd/slurmstepd.c b/src/slurmd/slurmstepd/slurmstepd.c index c022c0242264eadde9dd178ea694377e9533bbd8..09d3d189e1d72a9466b6062c6532d6577f2dc99b 100644 --- a/src/slurmd/slurmstepd/slurmstepd.c +++ b/src/slurmd/slurmstepd/slurmstepd.c @@ -286,11 +286,8 @@ _init_from_slurmd(int sock, char **argv, case LAUNCH_TASKS: msg->msg_type = REQUEST_LAUNCH_TASKS; break; - case SPAWN_TASKS: - msg->msg_type = REQUEST_SPAWN_TASK; - break; default: - fatal("Unrecognized launch/spawn RPC"); + fatal("Unrecognized launch RPC"); break; } if(unpack_msg(msg, buffer) == SLURM_ERROR) @@ -338,12 +335,8 @@ _step_setup(slurm_addr *cli, slurm_addr *self, slurm_msg_t *msg) debug2("setup for a launch_task"); job = mgr_launch_tasks_setup(msg->data, cli, self); break; - case REQUEST_SPAWN_TASK: - debug2("setup for a spawn_task"); - job = mgr_spawn_task_setup(msg->data, cli, self); - break; default: - fatal("handle_launch_message: Unrecognized launch/spawn RPC"); + fatal("handle_launch_message: Unrecognized launch RPC"); break; } if(!job) { @@ -375,11 +368,8 @@ _step_cleanup(slurmd_job_t *job, slurm_msg_t *msg, int rc) case REQUEST_LAUNCH_TASKS: slurm_free_launch_tasks_request_msg(msg->data); break; - case REQUEST_SPAWN_TASK: - slurm_free_spawn_task_request_msg(msg->data); - break; default: - fatal("handle_launch_message: Unrecognized launch/spawn RPC"); + fatal("handle_launch_message: Unrecognized launch RPC"); break; } jobacct_g_free(step_complete.jobacct); diff --git a/src/slurmd/slurmstepd/slurmstepd_job.c b/src/slurmd/slurmstepd/slurmstepd_job.c index 28bddef37247c53c95ef9202b59081c16cc74853..4db41e6f6038dda235a20ab61ddf7e96bcea6c52 100644 --- a/src/slurmd/slurmstepd/slurmstepd_job.c +++ b/src/slurmd/slurmstepd/slurmstepd_job.c @@ -146,7 +146,7 @@ _valid_gid(struct passwd *pwd, gid_t *gid) /* create a slurmd job structure from a launch tasks message */ slurmd_job_t * -job_create(launch_tasks_request_msg_t *msg, slurm_addr *cli_addr) +job_create(launch_tasks_request_msg_t *msg) { struct passwd *pwd = NULL; slurmd_job_t *job = NULL; @@ -225,10 +225,13 @@ job_create(launch_tasks_request_msg_t *msg, slurm_addr *cli_addr) slurm_set_addr(&resp_addr, msg->resp_port[nodeid % msg->num_resp_port], NULL); - memcpy(&io_addr, &msg->orig_addr, sizeof(slurm_addr)); - slurm_set_addr(&io_addr, - msg->io_port[nodeid % msg->num_io_port], - NULL); + job->user_managed_io = msg->user_managed_io; + if (!msg->user_managed_io) { + memcpy(&io_addr, &msg->orig_addr, sizeof(slurm_addr)); + slurm_set_addr(&io_addr, + msg->io_port[nodeid % msg->num_io_port], + NULL); + } srun = srun_info_create(msg->cred, &resp_addr, &io_addr); @@ -259,94 +262,6 @@ job_create(launch_tasks_request_msg_t *msg, slurm_addr *cli_addr) return job; } -/* create a slurmd job structure from a spawn task message. - * NOTE: gid field in spawn_task_request_msg_t is not used. */ -slurmd_job_t * -job_spawn_create(spawn_task_request_msg_t *msg, slurm_addr *cli_addr) -{ - struct passwd *pwd; - slurmd_job_t *job; - srun_info_t *srun; - slurm_addr io_addr; - int nodeid = NO_VAL; - - xassert(msg != NULL); - xassert(msg->complete_nodelist != NULL); - - debug3("entering job_spawn_create"); - - if ((pwd = _pwd_create((uid_t)msg->uid)) == NULL) { - error("uid %ld not found on system", (long) msg->uid); - slurm_seterrno (ESLURMD_UID_NOT_FOUND); - return NULL; - } - job = xmalloc(sizeof(slurmd_job_t)); -#ifndef HAVE_FRONT_END - nodeid = nodelist_find(msg->complete_nodelist, conf->node_name); - job->node_name = xstrdup(conf->node_name); -#else - nodeid = 0; - job->node_name = xstrdup(msg->complete_nodelist); -#endif - if(nodeid < 0) { - error("couldn't find node %s in %s", - job->node_name, msg->complete_nodelist); - job_destroy(job); - return NULL; - } - job->state = SLURMSTEPD_STEP_STARTING; - job->pwd = pwd; - job->ntasks = 1; /* tasks to launch always one */ - job->nprocs = msg->nprocs; - job->jobid = msg->job_id; - job->stepid = msg->job_step_id; - job->spawn_task = true; - - job->uid = (uid_t) msg->uid; - job->gid = job->pwd->pw_gid; - job->cwd = xstrdup(msg->cwd); - - job->env = _array_copy(msg->envc, msg->env); - job->eio = eio_handle_create(); - job->sruns = list_create((ListDelF) _srun_info_destructor); - job->envtp = xmalloc(sizeof(env_t)); - job->envtp->jobid = -1; - job->envtp->stepid = -1; - job->envtp->procid = -1; - job->envtp->localid = -1; - job->envtp->nodeid = -1; - job->envtp->cpu_bind_type = 0; - job->envtp->cpu_bind = NULL; - job->envtp->mem_bind_type = 0; - job->envtp->mem_bind = NULL; - - memcpy(&io_addr, cli_addr, sizeof(slurm_addr)); - slurm_set_addr(&io_addr, msg->io_port, NULL); - - srun = srun_info_create(msg->cred, NULL, &io_addr); - - job->argc = msg->argc; - job->argv = _array_copy(job->argc, msg->argv); - - job->nnodes = msg->nnodes; - job->nodeid = nodeid; - job->debug = msg->slurmd_debug; - job->cpus = msg->cpus_allocated; - job->multi_prog = msg->multi_prog; - job->timelimit = (time_t) -1; - job->task_flags = msg->task_flags; - job->switch_job = msg->switch_job; - - list_append(job->sruns, (void *) srun); - - job->task = (slurmd_task_info_t **) - xmalloc(sizeof(slurmd_task_info_t *)); - job->task[0] = task_info_create(0, msg->global_task_id, - NULL, NULL, NULL); - - return job; -} - /* * return the default output filename for a batch job */ diff --git a/src/slurmd/slurmstepd/slurmstepd_job.h b/src/slurmd/slurmstepd/slurmstepd_job.h index 1b119a31166c88855d7d0a1baef25c5f40443d01..fd89458d714c2cefe1663a39d2e792018323dc29 100644 --- a/src/slurmd/slurmstepd/slurmstepd_job.h +++ b/src/slurmd/slurmstepd/slurmstepd_job.h @@ -68,7 +68,9 @@ typedef struct srun_key { typedef struct srun_info { srun_key_t *key; /* srun key for IO verification */ slurm_addr resp_addr; /* response addr for task exit msg */ - slurm_addr ioaddr; /* Address to connect on for I/O */ + slurm_addr ioaddr; /* Address to connect on for normal I/O. + Spawn IO uses messages to the normal + resp_addr. */ } srun_info_t; typedef enum task_state { @@ -131,7 +133,7 @@ typedef struct slurmd_job { gid_t *gids; /* array of gids for user specified in uid */ bool batch; /* true if this is a batch job */ bool run_prolog; /* true if need to run prolog */ - bool spawn_task; /* stand-alone task */ + bool user_managed_io; time_t timelimit; /* time at which job must stop */ char *task_prolog; /* per-task prolog */ char *task_epilog; /* per-task epilog */ @@ -187,9 +189,8 @@ typedef struct slurmd_job { } slurmd_job_t; -slurmd_job_t * job_create(launch_tasks_request_msg_t *msg, slurm_addr *client); +slurmd_job_t * job_create(launch_tasks_request_msg_t *msg); slurmd_job_t * job_batch_job_create(batch_job_launch_msg_t *msg); -slurmd_job_t * job_spawn_create(spawn_task_request_msg_t *msg, slurm_addr *client); void job_kill(slurmd_job_t *job, int signal); diff --git a/src/slurmd/slurmstepd/task.c b/src/slurmd/slurmstepd/task.c index 22ae748d30c763656365ee89ab82aaeef4a55e3d..227bcd1e607c8d9048553c0e9fd7feae1fe5eafe 100644 --- a/src/slurmd/slurmstepd/task.c +++ b/src/slurmd/slurmstepd/task.c @@ -90,41 +90,10 @@ * Static prototype definitions. */ static void _make_tmpdir(slurmd_job_t *job); -static void _setup_spawn_io(slurmd_job_t *job); static int _run_script(const char *name, const char *path, slurmd_job_t *job); static void _update_env(char *buf, char ***env); - -static void _setup_spawn_io(slurmd_job_t *job) -{ - srun_info_t *srun; - int fd = -1; - - srun = list_peek(job->sruns); - xassert(srun); - if ((fd = (int) slurm_open_stream(&srun->ioaddr)) < 0) { - error("connect spawn io stream: %m"); - exit(1); - } - - if (dup2(fd, STDIN_FILENO) == -1) { - error("dup2 over STDIN_FILENO: %m"); - exit(1); - } - if (dup2(fd, STDOUT_FILENO) == -1) { - error("dup2 over STDOUT_FILENO: %m"); - exit(1); - } - if (dup2(fd, STDERR_FILENO) == -1) { - error("dup2 over STDERR_FILENO: %m"); - exit(1); - } - - if (fd > 2) - (void) close(fd); -} - /* Search for "export NAME=value" records in buf and * use them to add environment variables to env */ static void @@ -247,7 +216,7 @@ exec_task(slurmd_job_t *job, int i, int waitfd) slurmd_task_info_t *t = NULL; - if ((!job->spawn_task) && (set_user_limits(job) < 0)) { + if (set_user_limits(job) < 0) { debug("Unable to set user limits"); log_fini(); exit(5); @@ -309,10 +278,7 @@ exec_task(slurmd_job_t *job, int i, int waitfd) * If io_prepare_child() is moved above interconnect_attach() * this causes EBADF from qsw_attach(). Why? */ - if (job->spawn_task) - _setup_spawn_io(job); - else - io_dup_stdio(job->task[i]); + io_dup_stdio(job->task[i]); /* task-specific pre-launch activities */ diff --git a/testsuite/expect/README b/testsuite/expect/README index 31bd31dae9e244f701eb7550a48bc097fe92cb0c..f63700963be5ce31affa6d0fd555ed6141305310 100644 --- a/testsuite/expect/README +++ b/testsuite/expect/README @@ -248,7 +248,8 @@ test7.1 Test priorities slurmctld assigns to jobs. Uses srun --hold and --batch options. test7.2 Test of PMI functions available via API library. Uses srun and slaunch. Tests --pmi-threads option in both commands. -test7.3 Test of slurm_spawn API (needed on IBM SP systems). +test7.3 Test of slurm_step_launch API with spawn_io=true + (needed by poe on IBM AIX systems). test7.4 Test of TotalView operation with srun, with and without bulk transfer. test7.5 Test of TotalView operation with slaunch, with and without bulk diff --git a/testsuite/expect/test7.2 b/testsuite/expect/test7.2 index 9c03ad68e95b2448206d1e1afd321ed52e76b7cf..ea5b7807b59e44759f7121b3e0943c0a8e5d6fdf 100755 --- a/testsuite/expect/test7.2 +++ b/testsuite/expect/test7.2 @@ -45,7 +45,7 @@ exec $bin_rm -f $file_prog_get if {![test_aix]} { exec $bin_cc ${file_prog_get}.c -g -pthread -o $file_prog_get -I${slurm_dir}/include -Wl,--rpath=${slurm_dir}/lib -L${slurm_dir}/lib -lpmi -lslurm } else { - exec $bin_cc ${file_prog_get}.c -g -pthread -o $file_prog_get -I${slurm_dir}/include -L${slurm_dir}/lib -lpmi -lslurm + exec $bin_cc ${file_prog_get}.c -Wl,-brtl -g -pthread -o $file_prog_get -I${slurm_dir}/include -L${slurm_dir}/lib -lpmi -lslurm } exec $bin_chmod 700 $file_prog_get diff --git a/testsuite/expect/test7.3 b/testsuite/expect/test7.3 index 90c9b6c94efb3702bd643ba032a1d8d0654f68d5..6e7356bb84f5d7a284d5ec4f9f886911f2150601 100755 --- a/testsuite/expect/test7.3 +++ b/testsuite/expect/test7.3 @@ -37,7 +37,6 @@ set test_id "7.3" set exit_code 0 set io_prog "test$test_id.io" set test_prog "test$test_id.prog" -set test_out "test$test_id.out" print_header $test_id @@ -49,15 +48,17 @@ if {[test_front_end] != 0} { # # Delete left-over programs and rebuild them # -exec $bin_rm -f $io_prog $test_prog $test_out +file delete $io_prog $test_prog exec $bin_make -f /dev/null $io_prog exec $bin_chmod 700 $io_prog +send_user "slurm_dir is $slurm_dir\n" if {![test_aix]} { exec $bin_cc ${test_prog}.c -g -pthread -o ${test_prog} -I${slurm_dir}/include -Wl,--rpath=${slurm_dir}/lib -L${slurm_dir}/lib -lslurm } else { - exec $bin_cc ${test_prog}.c -g -pthread -o ${test_prog} -I${slurm_dir}/include -L${slurm_dir}/lib -lslurm + send_user "$bin_cc ${test_prog}.c -Wl,-brtl -g -pthread -o ${test_prog} -I${slurm_dir}/include -L${slurm_dir}/lib -lslurm\n" + exec $bin_cc ${test_prog}.c -Wl,-brtl -g -pthread -o ${test_prog} -I${slurm_dir}/include -L${slurm_dir}/lib -lslurm } exec $bin_chmod 700 $test_prog @@ -73,9 +74,7 @@ set job_id 0 set matches 0 set task_cnt 1 # Usage: test7.3.prog [min_nodes] [max_nodes] [tasks] -catch {exec ./$test_prog 1 2 >$test_out} -exec $bin_touch $test_out -spawn $bin_cat $test_out +spawn ./$test_prog 1 2 expect { -re "job_id ($number)" { set job_id $expect_out(1,string) @@ -126,11 +125,11 @@ expect { if {$matches != [expr $task_cnt * 3]} { send_user "\nFAILURE: spawn_task communications failure\n" - set exit_code 1 + set exit_code 1 } if {$exit_code == 0} { - exec $bin_rm -f $io_prog $test_prog $test_out + file delete $io_prog $test_prog send_user "\nSUCCESS\n" } exit $exit_code diff --git a/testsuite/expect/test7.3.io.c b/testsuite/expect/test7.3.io.c index 277805522038389f2366b499b9c953dd78d85041..8d87b698521ce33536cfcb47eb7837b8d7391ee5 100644 --- a/testsuite/expect/test7.3.io.c +++ b/testsuite/expect/test7.3.io.c @@ -1,5 +1,7 @@ /*****************************************************************************\ - * test7.3.io.c - Test of slurm_spawn API (needed on IBM SP systems). + * test7.3.io.c - Test of "user managed" IO with the slurm_step_launch() + * API function (required for "poe" launch on IBM + * AIX systems). * * Writes short message to stdout, another from stderr, reads message from * stdin and writes it back to stdout with header. @@ -43,7 +45,7 @@ int main(int argc, char **argv) sprintf(buf1, "task %d write to stdout:", procid); write(STDOUT_FILENO, buf1, strlen(buf1)); sprintf(buf1, "task %d write to stderr:", procid); - write(STDOUT_FILENO, buf1, strlen(buf1)); + write(STDOUT_FILENO, buf1, strlen(buf1)); while ((size = read(STDIN_FILENO, buf1, sizeof(buf1))) != 0) { if (size > 0) { int offset; @@ -69,7 +71,5 @@ int main(int argc, char **argv) close(STDOUT_FILENO); close(STDERR_FILENO); - sleep (10); - return (0); } diff --git a/testsuite/expect/test7.3.prog.c b/testsuite/expect/test7.3.prog.c index 0762a7948b26e3ea8cf7711b8459edd57e144828..8a8d670b39d69bc899095365d5bb4701192ecccc 100644 --- a/testsuite/expect/test7.3.prog.c +++ b/testsuite/expect/test7.3.prog.c @@ -1,5 +1,7 @@ /*****************************************************************************\ - * test7.3.prog.c - Test of slurm_spawn API (needed on IBM SP systems). + * test7.3.prog.c - Test of "user managed" IO with the slurm_step_launch() + * API function (required for "poe" launch on IBM + * AIX systems). * * Usage: test7.3.prog [min_nodes] [max_nodes] [tasks] ***************************************************************************** @@ -35,9 +37,8 @@ #include <slurm/slurm.h> #include <slurm/slurm_errno.h> -#define TASKS_PER_NODE 1 /* Can't have more with current spawn RPC */ +#define TASKS_PER_NODE 1 -static int *_build_socket_array(int tasks); static void _do_task_work(int *fd_array, int tasks); int main (int argc, char *argv[]) @@ -47,9 +48,11 @@ int main (int argc, char *argv[]) resource_allocation_response_msg_t *job_resp; job_step_create_request_msg_t step_req; slurm_step_ctx ctx = NULL; + slurm_job_step_launch_t launch; char *task_argv[3]; - char cwd[128]; + char cwd[PATH_MAX]; int *fd_array = NULL; + int num_fd; if (argc > 1) { i = atoi(argv[1]); @@ -105,8 +108,10 @@ int main (int argc, char *argv[]) printf("Starting %d tasks on %d nodes\n", tasks, nodes); fflush(stdout); - /* Set up step configuration */ - bzero(&step_req, sizeof(job_step_create_request_msg_t )); + /* + * Create a job step context. + */ + memset(&step_req, 0, sizeof(job_step_create_request_msg_t)); step_req.job_id = job_resp->job_id; step_req.user_id = getuid(); step_req.node_count = nodes; @@ -119,28 +124,53 @@ int main (int argc, char *argv[]) goto done; } - task_argv[0] = "./test7.3.io"; - if (slurm_step_ctx_set(ctx, SLURM_STEP_CTX_ARGS, - 1, task_argv)) - slurm_perror("slurm_step_ctx_create"); - getcwd(cwd, sizeof(cwd)); - if (slurm_step_ctx_set(ctx, SLURM_STEP_CTX_CHDIR, cwd)) - slurm_perror("slurm_step_ctx_create"); - fd_array = _build_socket_array(tasks); + /* + * Hack to run one task per node, regardless of what we set up + * when we created the job step context. + */ + if (slurm_step_ctx_daemon_per_node_hack(ctx) != SLURM_SUCCESS) { + slurm_perror("slurm_step_ctx_daemon_per_node_hack"); + rc = 1; + goto done; + } - /* Spawn the tasks */ - if (slurm_spawn(ctx, fd_array)) { - slurm_perror("slurm_spawn"); - slurm_kill_job(job_resp->job_id, SIGKILL, 0); + /* + * Launch the tasks using "user managed" IO. + * "user managed" IO means a TCP stream for each task, directly + * connected to the stdin, stdout, and stderr the task. + */ + slurm_job_step_launch_t_init(&launch); + task_argv[0] = "./test7.3.io"; + launch.argv = task_argv; + launch.argc = 1; + getcwd(cwd, PATH_MAX); + launch.cwd = cwd; + launch.user_managed_io = true; /* This is the key to using + "user managed" IO */ + + if (slurm_step_launch(ctx, &launch, NULL) != SLURM_SUCCESS) { + slurm_perror("slurm_step_launch"); rc = 1; goto done; } - /* Interact with spawned tasks as desired */ + if (!slurm_step_launch_wait_start(ctx)) { + slurm_perror("slurm_step_launch_wait_start"); + rc =1; + goto done; + } + + slurm_step_ctx_get(ctx, SLURM_STEP_CTX_USER_MANAGED_SOCKETS, + &num_fd, &fd_array); + + /* Interact with launched tasks as desired */ _do_task_work(fd_array, tasks); - if (slurm_spawn_kill(ctx, SIGKILL)) - slurm_perror("slurm_spawn_kill"); + for (i = 0; i < tasks; i++) { + close(fd_array[i]); + } + + slurm_step_launch_wait_finish(ctx); /* Terminate the job killing all tasks */ done: slurm_kill_job(job_resp->job_id, SIGKILL, 0); @@ -149,56 +179,25 @@ done: slurm_kill_job(job_resp->job_id, SIGKILL, 0); slurm_free_resource_allocation_response_msg(job_resp); if (ctx) slurm_step_ctx_destroy(ctx); - if (fd_array) { - for (i=0; i<tasks; i++) - (void) close(fd_array[i]); - free(fd_array); - } exit(0); } - -static int *_build_socket_array(int tasks) -{ - int *fd_array; - int i, val; - - fd_array = malloc(tasks * sizeof(int)); - if (fd_array == NULL) { - fprintf(stderr, "malloc error\n"); - exit(0); - } - - for (i=0; i<tasks; i++) { - if ((fd_array[i] = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) { - fprintf(stderr, "malloc error\n"); - exit(0); - } - setsockopt(fd_array[i], SOL_SOCKET, SO_REUSEADDR, &val, sizeof(int)); - } - - return fd_array; -} - static void _do_task_work(int *fd_array, int tasks) { - int i, j, size, fd; + int i, j, size; char buf[1024]; - struct sockaddr sock_addr; - socklen_t sock_len = sizeof(sock_addr); for (i=0; i<tasks; i++) { - fd = accept(fd_array[i], &sock_addr, &sock_len); - if (fd < 0) { - perror("accept"); + if (fd_array[i] < 0) { + perror("Invalid file descriptor"); continue; } sprintf(buf, "test message"); - write(fd, buf, strlen(buf)); + write(fd_array[i], buf, strlen(buf)); while (1) { - size = read(fd, buf, sizeof(buf)); + size = read(fd_array[i], buf, sizeof(buf)); if (size > 0) { printf("task %d read:size:%d:msg:", i, size); for (j=0; j<size; j++) @@ -214,8 +213,6 @@ static void _do_task_work(int *fd_array, int tasks) break; } } - close(fd); - close(fd_array[i]); } return;