From 9ba95c0e862c758ccf007b68235dd1696ea75c28 Mon Sep 17 00:00:00 2001 From: Moe Jette <jette1@llnl.gov> Date: Tue, 15 Jun 2004 19:40:07 +0000 Subject: [PATCH] Added support for slurm_spawn() and associated APIs. --- NEWS | 2 + doc/man/Makefile.am | 5 + doc/man/man3/slurm_spawn.3 | 294 +++++++++++ doc/man/man3/slurm_spawn_kill.3 | 1 + doc/man/man3/slurm_step_ctx_create.3 | 1 + doc/man/man3/slurm_step_ctx_destroy.3 | 1 + doc/man/man3/slurm_step_ctx_set.3 | 1 + slurm/slurm.h.in | 60 +++ src/api/Makefile.am | 1 + src/api/spawn.c | 729 ++++++++++++++++++++++++++ src/common/slurm_protocol_defs.c | 29 +- src/common/slurm_protocol_defs.h | 29 +- src/common/slurm_protocol_pack.c | 79 +++ src/slurmd/job.c | 74 ++- src/slurmd/job.h | 2 + src/slurmd/mgr.c | 73 ++- src/slurmd/mgr.h | 4 + src/slurmd/req.c | 89 +++- src/slurmd/smgr.c | 30 +- 19 files changed, 1484 insertions(+), 20 deletions(-) create mode 100644 doc/man/man3/slurm_spawn.3 create mode 100644 doc/man/man3/slurm_spawn_kill.3 create mode 100644 doc/man/man3/slurm_step_ctx_create.3 create mode 100644 doc/man/man3/slurm_step_ctx_destroy.3 create mode 100644 doc/man/man3/slurm_step_ctx_set.3 create mode 100644 src/api/spawn.c diff --git a/NEWS b/NEWS index b2c669b491b..9c4ae893efe 100644 --- a/NEWS +++ b/NEWS @@ -5,6 +5,8 @@ documents those changes that are of interest to users and admins. ======================== -- Fix "SLURM_RLIMIT_* not found in environment" error message when distributing large rlimit to jobs. + -- Add support for slurm_spawn() and associated APIs (needed for IBM + SP systems). * Changes in SLURM 0.3.4 ======================== diff --git a/doc/man/Makefile.am b/doc/man/Makefile.am index 6565ee8a344..e51371d7551 100644 --- a/doc/man/Makefile.am +++ b/doc/man/Makefile.am @@ -53,6 +53,11 @@ man3_MANS = man3/slurm_hostlist_create.3 \ man3/slurm_print_partition_info_msg.3 \ man3/slurm_reconfigure.3 \ man3/slurm_shutdown.3 \ + man3/slurm_spawn.3 \ + man3/slurm_spawn_kill.3 \ + man3/slurm_step_ctx_create.3 \ + man3/slurm_step_ctx_destroy.3 \ + man3/slurm_step_ctx_set.3 \ man3/slurm_strerror.3 \ man3/slurm_submit_batch_job.3 \ man3/slurm_update_job.3 \ diff --git a/doc/man/man3/slurm_spawn.3 b/doc/man/man3/slurm_spawn.3 new file mode 100644 index 00000000000..5ffe916337c --- /dev/null +++ b/doc/man/man3/slurm_spawn.3 @@ -0,0 +1,294 @@ +.TH "Slurm API" "3" "June 2004" "Morris Jette" "Slurm task spawn functions" +.SH "NAME" +slurm_spawn \- Slurm task spawn functions +.SH "SYNTAX" +.LP +#include <slurm/slurm.h> +.LP +.LP +slurm_step_ctx \fBslurm_step_ctx_create\fR ( +.br + job_step_create_request_msg_t *\fIstep_req\fP +.br +); +.LP +int \fBslurm_step_ctx_set\fR ( +.br + slurm_step_ctx \fIctx\fP, +.br + int \fIctx_key\fP, +.br + ... +.br +); +.LP +int \fBslurm_spawn\fR { +.br + slurm_step_ctx \fIctx\fP, +.br + int *\fIfd_array\fP +.br +); +.LP +int \fBslurm_spawn_kill\fR { +.br + slurm_step_ctx \fIctx\fP, +.br + uint16_t \fIsignal\fP +.br +); +.LP +int \fBslurm_step_ctx_destroy\fR { +.br + slurm_step_ctx \fIctx\fP +.br +); +.SH "ARGUMENTS" +.LP +.TP +\fIstep_req\fP +Specifies the pointer to the structure with job step request specification. See +slurm.h for full details on the data structure's contents. +.TP +\fIctx\fP +Job step context. Created by \fBslurm_step_ctx_create\fR, used in subsequent +function calls, and destroyed by \fBslurm_step_ctx_destroy\fR. +.TP +\fIctx_key\fP +Identifies the fields in \fIctx\fP to be set by \fBslurm_step_ctx_set\fR. +.TP +\fIfd_array\fP +Array of socket file descriptors to be connected to the initiated tasks. +Tasks will be connected to these file descriptors in order of their +task id. +This socket will carry standard input, output and error for the task. +.TP +\fIsignal\fP +Signal to be sent to the spawned tasks. +.SH "DESCRIPTION" +.LP +\fBslurm_step_ctx_create\fR Create a job step context. To avoid memory +leaks call \fBslurm_step_ctx_destroy\fR when the use of this context is +finished. +.LP +\fBslurm_step_ctx_set\fR Set values in a job step context. +\fIctx_key\fP identifies the fields to be set in the job step context. +Subsequent arguments to this function are dependent upon the value +of \fIctx_key\fP. See the \fBCONTEXT KEYS\fR section for details. +.LP +\fBslurm_spawn\fR Spawn tasks based upon a job step context +and establish communications with the tasks using the socket +file descriptors specified. +Note that this function can only be called once for each job +step context. +Establish a new job step context for each set of tasks to be spawned. +.LP +\fBslurm_spawn_kill\fR Signal the tasks spawned for this context +by \fBslurm_spawn\fR. +.LP +\fBslurm_step_ctx_destroy\fR Destroy a job step context created by +\fBslurm_step_ctx_create\fR. +.SH "CONEXT KEYS" +.TP +\fBSLURM_STEP_CTX_ARGS\fR +Set the argument count and values for the executable. +Accepts two additional arguments, the first of type int and +the second of type char **. +.TP +\fBSLURM_STEP_CTX_CHDIR\fR +Have the remote process change directory to the specified location +before beginning execution. Accepts one argument of type +char * indentifying the directory's pathname. By default +the remote process will execute in the same directory pathname +from which it is spawned. NOTE: This assumes that same directory +pathname exists on the other nodes. +.TP +\fBSLURM_STEP_CTX_ENV\fR +Sets the environment variable count and values for the executable. +Accepts two additional arguments, the first of type int and +the second of type char **. By default the current environment +variables are copied to started task's environment. +.SH "RETURN VALUE" +.LP +For \fB slurm_step_ctx_create\fR a context is return upon success. On error +NULL is returned and the Slurm error code is set appropriately. +.LP +For all other functions zero is returned upon success. +On error, -1 is returned, and the Slurm error code is set appropriately. +.SH "ERRORS" +.LP +\fBEINVAL\fR Invalid argument +.LP +\fBSLURM_PROTOCOL_VERSION_ERROR\fR Protocol version has changed, re-link your code. +.LP +\fBESLURM_INVALID_JOB_ID\fR the requested job id does not exist. +.LP +\fBESLURM_ALREADY_DONE\fR the specified job has already completed and can not be modified. +.LP +\fBESLURM_ACCESS_DENIED\fR the requesting user lacks authorization for the requested action (e.g. trying to delete or modify another user's job). +.LP +\fBESLURM_INTERCONNECT_FAILURE\fR failed to configure the node interconnect. +.LP +\fBESLURM_BAD_DIST\fR task distribution specification is invalid. +.LP +\fBSLURM_PROTOCOL_SOCKET_IMPL_TIMEOUT\fR Timeout in communicating with +SLURM controller. +.SH "EXAMPLE +.LP +#include <signal.h> +.br +#include <stdio.h> +.br +#include <stdlib.h> +.br +#include <string.h> +.br +#include <unistd.h> +.br +#include <sys/types.h> +.br +#include <slurm/slurm.h> +.br +#include <slurm/slurm_errno.h> +.LP +static int *_build_socket_array(int nodes); +.br +static void _do_task_work(int *fd_array, int nodes); +.LP +int main (int argc, char *argv[]) +.br +{ +.br + int i, nodes = 1, tasks; +.br + job_desc_msg_t job_req; +.br + resource_allocation_response_msg_t *job_resp; +.br + job_step_create_request_msg_t; step_req; +.br + slurm_step_ctx ctx; +.br + char *task_argv[2]; +.br + int *fd_array; +.LP + if (argc > 1) { +.br + i = atoi(argv[1]); +.br + if (i > 0) +.br + nodes = i; +.br + } +.br + tasks = nodes; +.LP + /* Create a job allocation */ +.br + slurm_init_job_desc_msg( &job_req ); +.br + job_req.min_nodes = nodes; +.br + if (slurm_allocate_resources(&job_req, &job_resp)) { +.br + slurm_perror ("slurm_allocate_resources error"); +.br + exit (1); +.br + } +.LP + /* Set up step configuration */ +.br + bzero(&step_req, sizeof(job_step_create_request_msg_t )); +.br + step_req.job_id = job_resp->job_id; +.br + step_req.user_id = getuid(); +.br + step_req.node_count = nodes; +.br + step_req.num_tasks = tasks; +.br + step_req.num_tasks = nodes; +.br + ctx = slurm_step_ctx_create(&step_req); +.br + if (ctx == NULL) { +.br + slurm_perror("slurm_step_ctx_create"); +.br + exit(1); +.br + } +.br + printf("Starting %d tasks on %d nodes\n", +.br + step_req.num_tasks, step_req.node_count); +.br + task_argv[0] = "/bin/hostname"; +.br + if (slurm_step_ctx_set(ctx, SLURM_STEP_CTX_ARGS, +.br + 1, &task_argv)) +.br + slurm_perror("slurm_step_ctx_create"); +.br + fd_array = _build_socket_array(tasks); +.LP + /* Spawn the tasks */ +.br + if (slurm_spawn(ctx, fd_array)) { +.br + slurm_perror("slurm_spawn"); +.br + slurm_kill_job(job_resp->job_id, SIGKILL, 0); +.br + exit(1); +.br + } +.LP + /* Interact with spawned tasks as desired */ +.br + _do_task_work(fd_array, tasks); +.LP + if (slurm_spawn_kill(ctx, SIGKILL)) +.br + slurm_perror("slurm_spawn_kill"); +.LP + /* Terminate the job killing all tasks */ +.br + slurm_kill_job(job_resp->job_id, SIGKILL, 0); +.br + slurm_step_ctx_destroy(ctx); +.br + slurm_free_resource_allocation_response_msg(job_resp); +.br + exit (0); +.br +} + +.SH "COPYING" +Copyright (C) 2004 The Regents of the University of California. +Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). +UCRL-CODE-2002-040. +.LP +This file is part of SLURM, a resource management program. +For details, see <http://www.llnl.gov/linux/slurm/>. +.LP +SLURM is free software; you can redistribute it and/or modify it under +the terms of the GNU General Public License as published by the Free +Software Foundation; either version 2 of the License, or (at your option) +any later version. +.LP +SLURM is distributed in the hope that it will be useful, but WITHOUT ANY +WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +details. +.SH "SEE ALSO" +.LP +\fBslurm_allocate_resources\fR(3), \fBslurm_job_step_create\fR(3), +\fBslurm_kill_job\fR(3), +\fBslurm_get_errno\fR(3), \fBslurm_perror\fR(3), \fBslurm_strerror\fR(3), +\fBsrun\fR(1) diff --git a/doc/man/man3/slurm_spawn_kill.3 b/doc/man/man3/slurm_spawn_kill.3 new file mode 100644 index 00000000000..02d58710e5d --- /dev/null +++ b/doc/man/man3/slurm_spawn_kill.3 @@ -0,0 +1 @@ +.so man3/slurm_spawn.3 diff --git a/doc/man/man3/slurm_step_ctx_create.3 b/doc/man/man3/slurm_step_ctx_create.3 new file mode 100644 index 00000000000..02d58710e5d --- /dev/null +++ b/doc/man/man3/slurm_step_ctx_create.3 @@ -0,0 +1 @@ +.so man3/slurm_spawn.3 diff --git a/doc/man/man3/slurm_step_ctx_destroy.3 b/doc/man/man3/slurm_step_ctx_destroy.3 new file mode 100644 index 00000000000..02d58710e5d --- /dev/null +++ b/doc/man/man3/slurm_step_ctx_destroy.3 @@ -0,0 +1 @@ +.so man3/slurm_spawn.3 diff --git a/doc/man/man3/slurm_step_ctx_set.3 b/doc/man/man3/slurm_step_ctx_set.3 new file mode 100644 index 00000000000..02d58710e5d --- /dev/null +++ b/doc/man/man3/slurm_step_ctx_set.3 @@ -0,0 +1 @@ +.so man3/slurm_spawn.3 diff --git a/slurm/slurm.h.in b/slurm/slurm.h.in index 7c01f160903..0f54629b141 100644 --- a/slurm/slurm.h.in +++ b/slurm/slurm.h.in @@ -162,6 +162,13 @@ enum node_states { * Values can be can be ORed */ #define SHOW_ALL 1 /* Show info for "hidden" partitions */ +/* Define keys for ctx_key argument of slurm_step_ctx_set() */ +enum ctx_keys { + SLURM_STEP_CTX_ARGS, /* set argument count and values */ + SLURM_STEP_CTX_CHDIR, /* set directory to run from */ + SLURM_STEP_CTX_ENV /* set environment variable count and values */ +}; + /*****************************************************************************\ * PROTOCOL DATA STRUCTURE DEFINITIONS \*****************************************************************************/ @@ -454,6 +461,10 @@ typedef struct slurm_update_node_msg { typedef struct partition_info update_part_msg_t; +/* Opaque data type for slurm_step_ctx_*, slurm_spawn, and + * slurm_spawn_kill functions */ +typedef struct slurm_step_ctx_struct *slurm_step_ctx; + /*****************************************************************************\ * RESOURCE ALLOCATION FUNCTIONS \*****************************************************************************/ @@ -624,6 +635,55 @@ extern int slurm_complete_job_step PARAMS(( uint32_t job_return_code, uint32_t system_return_code)); +/*****************************************************************************\ + * SLURM TASK SPAWNING FUNCTIONS +\*****************************************************************************/ + +/* + * slurm_step_ctx_create - Create a job step and its context. + * IN step_req - description of job step request + * RET the step context or NULL on failure with slurm errno set + * NOTE: Free allocated memory using slurm_step_ctx_destroy. + */ +extern slurm_step_ctx slurm_step_ctx_create PARAMS(( + job_step_create_request_msg_t *step_req)); + + +/* + * slurm_step_ctx_set - set parameters in job step context. + * IN ctx - job step context generated by slurm_step_ctx_create + * RET SLURM_SUCCESS or SLURM_ERROR (with slurm_errno set) + */ +extern int slurm_step_ctx_set PARAMS((slurm_step_ctx ctx, + int ctx_key, ...)); + +/* + * slurm_step_ctx_destroy - free allocated memory for a job step context. + * IN ctx - job step context generated by slurm_step_ctx_create + * RET SLURM_SUCCESS or SLURM_ERROR (with slurm_errno set) + */ +extern int slurm_step_ctx_destroy PARAMS((slurm_step_ctx ctx)); + + +/* + * slurm_spawn - spawn tasks for the given job step context + * IN ctx - job step context generated by slurm_step_ctx_create + * IN fd_array - array of socket file descriptors to connect with + * stdin, stdout, and stderr of spawned task + * RET SLURM_SUCCESS or SLURM_ERROR (with slurm_errno set) + */ +extern int slurm_spawn PARAMS((slurm_step_ctx ctx, + int *fd_array)); + +/* + * slurm_spawn_kill - send the specified signal to an existing job step + * IN ctx - job step context generated by slurm_step_ctx_create + * IN signal - signal number + * RET SLURM_SUCCESS or SLURM_ERROR (with slurm_errno set) + */ +extern int slurm_spawn_kill PARAMS((slurm_step_ctx ctx, + uint16_t signal)); + /*****************************************************************************\ * SLURM CONTROL CONFIGURATION READ/PRINT/UPDATE FUNCTIONS \*****************************************************************************/ diff --git a/src/api/Makefile.am b/src/api/Makefile.am index 6b7fe6ec21a..69b2ec0f75c 100644 --- a/src/api/Makefile.am +++ b/src/api/Makefile.am @@ -45,6 +45,7 @@ libslurm_la_SOURCES = \ job_step_info.c \ node_info.c \ partition_info.c \ + spawn.c \ submit.c \ reconfigure.c \ update_config.c diff --git a/src/api/spawn.c b/src/api/spawn.c new file mode 100644 index 00000000000..c0894aad2f5 --- /dev/null +++ b/src/api/spawn.c @@ -0,0 +1,729 @@ +/*****************************************************************************\ + * spawn.c - spawn task functions + ***************************************************************************** + * Copyright (C) 2004 The Regents of the University of California. + * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). + * Written by Morris Jette <jette1@llnl.gov>. + * UCRL-CODE-2002-040. + * + * This file is part of SLURM, a resource management program. + * For details, see <http://www.llnl.gov/linux/slurm/>. + * + * SLURM is free software; you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the License, or (at your option) + * any later version. + * + * SLURM is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along + * with SLURM; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. +\*****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#include <errno.h> +#include <pthread.h> +#include <stdarg.h> +#include <stdlib.h> +#include <stdio.h> +#include <string.h> +#include <unistd.h> +#include <netinet/in.h> +#include <sys/param.h> +#include <sys/socket.h> +#include <sys/types.h> + +#include <slurm/slurm.h> + +#include "src/common/hostlist.h" +#include "src/common/slurm_protocol_api.h" +#include "src/common/slurm_protocol_defs.h" +#include "src/common/xmalloc.h" +#include "src/common/xstring.h" + +#define _DEBUG 0 +#define _MAX_THREAD_COUNT 50 +#define STEP_CTX_MAGIC 0xc7a3 + +extern char **environ; + +struct slurm_step_ctx_struct { + uint16_t magic; /* magic number */ + + uint32_t job_id; /* assigned job id */ + uint32_t user_id; /* user the job runs as */ + uint32_t num_tasks; /* number of tasks to execute */ + uint16_t task_dist; /* see enum task_dist_state */ + + resource_allocation_response_msg_t *alloc_resp; + job_step_create_response_msg_t *step_resp; + + char *cwd; /* working directory */ + uint32_t argc; /* count of arguments */ + char **argv; /* argument list */ + uint16_t env_set; /* flag if user set env */ + uint32_t envc; /* count of env vars */ + char **env; /* environment variables */ + + char **host; /* name for each host */ + uint32_t *cpus; /* count of processors on each host */ + uint32_t *tasks; /* number of tasks on each host */ + uint32_t **tids; /* host id => task id mapping */ + hostlist_t hl; /* hostlist of assigned nodes */ + uint32_t nhosts; /* node count */ +}; + +typedef enum {DSH_NEW, DSH_ACTIVE, DSH_DONE, DSH_FAILED} state_t; +typedef struct thd { + pthread_t thread; /* thread ID */ + pthread_attr_t attr; /* pthread attributes */ + state_t state; /* thread state */ + time_t tstart; /* time thread started */ + slurm_msg_t * req; /* the message to send */ +} thd_t; + +static pthread_mutex_t thread_mutex = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t thread_cond = PTHREAD_COND_INITIALIZER; +static uint32_t threads_active = 0; /* currently active threads */ + +#if _DEBUG +static void _dump_ctx(slurm_step_ctx ctx); +#endif +static int _envcount(char **env); +static void _free_char_array(char ***argv_p, int cnt); +static int _p_launch(slurm_msg_t *req, slurm_step_ctx ctx); +static int _sock_bind_wild(int sockfd); +static int _task_layout(slurm_step_ctx ctx); +static int _task_layout_block(slurm_step_ctx ctx); +static int _task_layout_cyclic(slurm_step_ctx ctx); +static void * _thread_per_node_rpc(void *args); +static int _validate_ctx(slurm_step_ctx ctx); +static void _xcopy_char_array(char ***argv_p, char **argv, int cnt); +static void _xfree_char_array(char ***argv_p, int cnt); + +/* + * slurm_step_ctx_create - Create a job step and its context. + * IN step_req - description of job step request + * RET the step context or NULL on failure with slurm errno set + * NOTE: Free allocated memory using slurm_step_ctx_destroy. + */ +extern slurm_step_ctx +slurm_step_ctx_create (job_step_create_request_msg_t *step_req) +{ + struct slurm_step_ctx_struct *rc; + old_job_alloc_msg_t old_job_req; + job_step_create_response_msg_t *step_resp = NULL; + resource_allocation_response_msg_t *alloc_resp; + + old_job_req.job_id = step_req->job_id; + old_job_req.uid = getuid(); + if (slurm_confirm_allocation(&old_job_req, &alloc_resp) < 0) + return NULL; + + if ((slurm_job_step_create(step_req, &step_resp) < 0) || + (step_resp == NULL)) { + slurm_free_resource_allocation_response_msg(alloc_resp); + return NULL; /* slurm errno already set */ + } + + rc = xmalloc(sizeof(struct slurm_step_ctx_struct)); + rc->magic = STEP_CTX_MAGIC; + rc->job_id = step_req->job_id; + rc->user_id = step_req->user_id; + rc->num_tasks = step_req->num_tasks; + rc->task_dist = step_req->task_dist; + rc->step_resp = step_resp; + rc->alloc_resp = alloc_resp; + + return rc; +} + +/* + * slurm_step_ctx_set - set parameters in job step context. + * IN ctx - job step context generated by slurm_step_ctx_create + * RET SLURM_SUCCESS or SLURM_ERROR (with slurm_errno set) + */ +extern int +slurm_step_ctx_set (slurm_step_ctx ctx, int ctx_key, ...) +{ + va_list ap; + int rc = SLURM_SUCCESS; + + if ((ctx == NULL) || + (ctx->magic != STEP_CTX_MAGIC)) { + slurm_seterrno(EINVAL); + return SLURM_ERROR; + } + + va_start(ap, ctx_key); + switch (ctx_key) { + case SLURM_STEP_CTX_ARGS: + if (ctx->argv) + _xfree_char_array(&ctx->argv, ctx->argc); + ctx->argc = va_arg(ap, int); + if ((ctx->argc < 1) || (ctx->argc > 1024)) { + slurm_seterrno(EINVAL); + break; + } + _xcopy_char_array(&ctx->argv, va_arg(ap, char **), + ctx->argc); + break; + + case SLURM_STEP_CTX_CHDIR: + if (ctx->cwd) + xfree(ctx->cwd); + ctx->cwd = xstrdup(va_arg(ap, char *)); + break; + + case SLURM_STEP_CTX_ENV: + ctx->env_set = 1; + if (ctx->env) + _xfree_char_array(&ctx->env, ctx->envc); + ctx->envc = va_arg(ap, int); + if ((ctx->envc < 1) || (ctx->envc > 1024)) { + slurm_seterrno(EINVAL); + break; + } + _xcopy_char_array(&ctx->env, va_arg(ap, char **), + ctx->envc); + break; + + default: + slurm_seterrno(EINVAL); + rc = SLURM_ERROR; + } + va_end(ap); + + return rc; +} + + +/* + * slurm_step_ctx_destroy - free allocated memory for a job step context. + * IN ctx - job step context generated by slurm_step_ctx_create + * RET SLURM_SUCCESS or SLURM_ERROR (with slurm_errno set) + */ +extern int +slurm_step_ctx_destroy (slurm_step_ctx ctx) +{ + if ((ctx == NULL) || + (ctx->magic != STEP_CTX_MAGIC)) { + slurm_seterrno(EINVAL); + return SLURM_ERROR; + } + + if (ctx->step_resp) + slurm_free_job_step_create_response_msg(ctx->step_resp); + + if (ctx->alloc_resp) + slurm_free_resource_allocation_response_msg(ctx->alloc_resp); + + if (ctx->argv) + _xfree_char_array(&ctx->argv, ctx->argc); + if (ctx->env_set) + _xfree_char_array(&ctx->env, ctx->envc); + + if (ctx->host) + _free_char_array(&ctx->host, ctx->nhosts); + + if (ctx->hl) + hostlist_destroy(ctx->hl); + + if (ctx->cpus) + xfree(ctx->cpus); + if (ctx->tasks) + xfree(ctx->tasks); + + if (ctx->tids) { + int i; + for (i=0; i<ctx->nhosts; i++) { + if (ctx->tids[i]) + xfree(ctx->tids[i]); + } + } + + xfree(ctx); + return SLURM_SUCCESS; +} + + +/* + * slurm_spawn - spawn tasks for the given job step context + * IN ctx - job step context generated by slurm_step_ctx_create + * IN fd_array - array of socket file descriptors to connect with + * stdin, stdout, and stderr of spawned task + * RET SLURM_SUCCESS or SLURM_ERROR (with slurm_errno set) + */ +extern int slurm_spawn (slurm_step_ctx ctx, int *fd_array) +{ + spawn_task_request_msg_t *msg_array_ptr; + int *sock_array; + slurm_msg_t *req_array_ptr; + int i, j, rc = SLURM_SUCCESS; + + if ((ctx == NULL) || + (ctx->magic != STEP_CTX_MAGIC) || + (fd_array == NULL)) { + slurm_seterrno(EINVAL); + return SLURM_ERROR; + } + + if (_validate_ctx(ctx)) + return SLURM_ERROR; + + /* validate fd_array and bind them to ports */ + sock_array = xmalloc(ctx->num_tasks * sizeof(int)); + for (i=0; i<ctx->num_tasks; i++) { + if (fd_array[i] < 0) { + slurm_seterrno(EINVAL); + free(sock_array); + return SLURM_ERROR; + } + sock_array[i] = _sock_bind_wild(fd_array[i]); + if (sock_array[i] < 0) { + slurm_seterrno(EINVAL); + free(sock_array); + return SLURM_ERROR; + } + listen(fd_array[i], 5); + } + + msg_array_ptr = xmalloc(sizeof(spawn_task_request_msg_t) * + ctx->num_tasks); + req_array_ptr = xmalloc(sizeof(slurm_msg_t) * ctx->num_tasks); + for (i=0; i<ctx->nhosts; i++) { + for (j=0; j<ctx->tasks[i]; j++) { + uint32_t tid = ctx->tids[i][j]; + spawn_task_request_msg_t *r = &msg_array_ptr[tid]; + slurm_msg_t *m = &req_array_ptr[tid]; + + /* Common message contents */ + r->job_id = ctx->job_id; + r->uid = ctx->user_id; + r->argc = ctx->argc; + r->argv = ctx->argv; + r->cred = ctx->step_resp->cred; + r->job_step_id = ctx->step_resp->job_step_id; + r->envc = ctx->envc; + r->env = ctx->env; + r->cwd = ctx->cwd; + r->nnodes = ctx->nhosts; + r->nprocs = ctx->num_tasks; + r->switch_job = ctx->step_resp->switch_job; + r->slurmd_debug = 7; + + /*Task specific message contents */ + r->global_task_id = ctx->tids[i][j]; + r->cpus_allocated = ctx->cpus[i]; + r->srun_node_id = (uint32_t) i; + r->io_port = ntohs(sock_array[i]); + m->msg_type = REQUEST_SPAWN_TASK; + m->data = &msg_array_ptr[i]; + memcpy(&m->address, &ctx->alloc_resp->node_addr[i], + sizeof(slurm_addr)); +#if _DEBUG + printf("tid=%d, fd=%d, port=%u, node_id=%u\n", + tid, fd_array[tid], r->io_port, i); +#endif + } + } + + rc = _p_launch(req_array_ptr, ctx); + + xfree(msg_array_ptr); + xfree(req_array_ptr); + xfree(sock_array); + + return rc; +} + + +/* + * slurm_spawn_kill - send the specified signal to an existing job step + * IN ctx - job step context generated by slurm_step_ctx_create + * IN signal - signal number + * RET SLURM_SUCCESS or SLURM_ERROR (with slurm_errno set) + */ +extern int +slurm_spawn_kill (slurm_step_ctx ctx, uint16_t signal) +{ + if ((ctx == NULL) || + (ctx->magic != STEP_CTX_MAGIC)) { + slurm_seterrno(EINVAL); + return SLURM_ERROR; + } + + return slurm_kill_job_step (ctx->job_id, + ctx->step_resp->job_step_id, signal); +} + + +static int _sock_bind_wild(int sockfd) +{ + socklen_t len; + struct sockaddr_in sin; + + memset(&sin, 0, sizeof(sin)); + sin.sin_family = AF_INET; + sin.sin_addr.s_addr = htonl(INADDR_ANY); + sin.sin_port = htons(0); /* bind ephemeral port */ + + if (bind(sockfd, (struct sockaddr *) &sin, sizeof(sin)) < 0) + return (-1); + len = sizeof(sin); + if (getsockname(sockfd, (struct sockaddr *) &sin, &len) < 0) + return (-1); + return (sin.sin_port); +} + + +/* validate the context of ctx, set default values as needed */ +static int _validate_ctx(slurm_step_ctx ctx) +{ + int rc; + + if (ctx->cwd == NULL) { + ctx->cwd = xmalloc(MAXPATHLEN); + if (ctx->cwd == NULL) { + slurm_seterrno(ENOMEM); + return SLURM_ERROR; + } + getcwd(ctx->cwd, MAXPATHLEN); + } + + if (ctx->env_set == 0) { + ctx->envc = _envcount(environ); + ctx->env = environ; + } + + ctx->hl = hostlist_create(ctx->step_resp->node_list); + ctx->nhosts = hostlist_count(ctx->hl); + + rc =_task_layout(ctx); + +#if _DEBUG + _dump_ctx(ctx); +#endif + return rc; +} + + +/* build maps for task layout on nodes */ +static int _task_layout(slurm_step_ctx ctx) +{ + int cpu_cnt = 0, cpu_inx = 0, i; + + ctx->cpus = xmalloc(sizeof(uint32_t) * ctx->nhosts); + ctx->tasks = xmalloc(sizeof(uint32_t) * ctx->nhosts); + ctx->host = xmalloc(sizeof(char *) * ctx->nhosts); + if ((ctx->cpus == NULL) || (ctx->tasks == NULL) || + (ctx->host == NULL)) { + slurm_seterrno(ENOMEM); + return SLURM_ERROR; + } + + for (i=0; i<ctx->nhosts; i++) { + ctx->host[i] = hostlist_shift(ctx->hl); + ctx->cpus[i] = ctx->alloc_resp->cpus_per_node[cpu_inx]; + if ((++cpu_cnt) >= ctx->alloc_resp->cpu_count_reps[cpu_inx]) { + /* move to next record */ + cpu_inx++; + cpu_cnt = 0; + } + } + + ctx->tasks = xmalloc(sizeof(uint32_t) * ctx->nhosts); + ctx->tids = xmalloc(sizeof(uint32_t *) * ctx->nhosts); + if ((ctx->tasks == NULL) || (ctx->tids == NULL)) { + slurm_seterrno(ENOMEM); + return SLURM_ERROR; + } + + if (ctx->task_dist == SLURM_DIST_CYCLIC) + return _task_layout_cyclic(ctx); + else + return _task_layout_block(ctx); +} + + +/* to effectively deal with heterogeneous nodes, we fake a cyclic + * distribution to figure out how many tasks go on each node and + * then make those assignments in a block fashion */ +static int _task_layout_block(slurm_step_ctx ctx) +{ + int i, j, taskid = 0; + bool over_subscribe = false; + + /* figure out how many tasks go to each node */ + for (j=0; (taskid<ctx->num_tasks); j++) { /* cycle counter */ + bool space_remaining = false; + for (i=0; ((i<ctx->nhosts) && (taskid<ctx->num_tasks)); i++) { + if ((j<ctx->cpus[i]) || over_subscribe) { + taskid++; + ctx->tasks[i]++; + if ((j+1) < ctx->cpus[i]) + space_remaining = true; + } + } + if (!space_remaining) + over_subscribe = true; + } + + /* now distribute the tasks */ + taskid = 0; + for (i=0; i < ctx->nhosts; i++) { + ctx->tids[i] = xmalloc(sizeof(uint32_t) * ctx->tasks[i]); + if (ctx->tids[i] == NULL) { + slurm_seterrno(ENOMEM); + return SLURM_ERROR; + } + for (j=0; j<ctx->tasks[i]; j++) + ctx->tids[i][j] = taskid++; + } + return SLURM_SUCCESS; +} + + +/* distribute tasks across available nodes: allocate tasks to nodes + * in a cyclic fashion using available processors. once all available + * processors are allocated, continue to allocate task over-subscribing + * nodes as needed. for example + * cpus per node 4 2 4 2 + * -- -- -- -- + * task distribution: 0 1 2 3 + * 4 5 6 7 + * 8 9 + * 10 11 all processors allocated now + * 12 13 14 15 etc. + */ +static int _task_layout_cyclic(slurm_step_ctx ctx) +{ + int i, j, taskid = 0; + bool over_subscribe = false; + + for (i=0; i<ctx->nhosts; i++) { + ctx->tids[i] = xmalloc(sizeof(uint32_t) * ctx->num_tasks); + if (ctx->tids[i] == NULL) { + slurm_seterrno(ENOMEM); + return SLURM_ERROR; + } + } + for (j=0; taskid<ctx->num_tasks; j++) { /* cycle counter */ + bool space_remaining = false; + for (i=0; ((i<ctx->nhosts) && (taskid<ctx->num_tasks)); i++) { + if ((j<ctx->cpus[i]) || over_subscribe) { + ctx->tids[i][ctx->tasks[i]] = taskid++; + ctx->tasks[i]++; + if ((j+1) < ctx->cpus[i]) + space_remaining = true; + } + } + if (!space_remaining) + over_subscribe = true; + } + return SLURM_SUCCESS; +} + + +/* return number of elements in environment 'env' */ +static int _envcount(char **env) +{ + int envc = 0; + while (env[envc] != NULL) + envc++; + return (envc); +} + + +#if _DEBUG +/* dump the contents of a job step context */ +static void _dump_ctx(slurm_step_ctx ctx) +{ + int i; + + if ((ctx == NULL) || + (ctx->magic != STEP_CTX_MAGIC)) { + printf("Invalid _dump_ctx argument\n"); + return; + } + + printf("job_id = %u\n", ctx->job_id); + printf("user_id = %u\n", ctx->user_id); + printf("nhosts = %u\n", ctx->nhosts); + printf("num_tasks = %u\n", ctx->num_tasks); + printf("task_dist = %u\n", ctx->task_dist); + + printf("step_id = %u\n", ctx->step_resp->job_step_id); + printf("nodelist = %s\n", ctx->step_resp->node_list); + + printf("cws = %s\n", ctx->cwd); + + for (i=0; i<ctx->argc; i++) { + printf("argv[%d] = %s\n", i, ctx->argv[i]); + if (i > 5) { + printf("...\n"); + break; + } + } + + for (i=0; i<ctx->envc; i++) { + if (strlen(ctx->env[i]) > 50) + printf("env[%d] = %.50s...\n", i, ctx->env[i]); + else + printf("env[%d] = %s\n", i, ctx->env[i]); + if (i > 5) { + printf("...\n"); + break; + } + } + + for (i=0; i<ctx->nhosts; i++) + printf("host=%s cpus=%u tasks=%u tid[0]=%u\n", + ctx->host[i], ctx->cpus[i], ctx->tasks[i], + ctx->tids[i][0]); + + printf("\n"); +} +#endif + + +/* xfree an array of character strings as created by _xcopy_char_array */ +static void _xfree_char_array(char ***argv_p, int cnt) +{ + char **argv = *argv_p; + int i; + + for (i=0; i<cnt; i++) + xfree(argv[i]); + xfree(*argv_p); +} + + +/* free an array of character strings as created by hostlist_shift */ +static void _free_char_array(char ***argv_p, int cnt) +{ + char **argv = *argv_p; + int i; + + for (i=0; i<cnt; i++) + free(argv[i]); + xfree(*argv_p); +} + + +/* copy a character array, free with _xfree_char_array */ +static void _xcopy_char_array(char ***argv_p, char **argv, int cnt) +{ + int i; + char **tmp = xmalloc(sizeof(char *) * cnt); + + for (i=0; i<cnt; i++) + tmp[i] = xstrdup(argv[i]); + + *argv_p = tmp; +} + + +/* parallel (multi-threaded) task launch, transmits all RPCs in parallel with timeout */ +static int _p_launch(slurm_msg_t *req, slurm_step_ctx ctx) +{ + int rc = SLURM_SUCCESS, i; + thd_t *thd; + + thd = xmalloc(sizeof(thd_t) * ctx->num_tasks); + if (thd == NULL) { + slurm_seterrno(ENOMEM); + return SLURM_ERROR; + } + + for (i=0; i<ctx->num_tasks; i++) { + thd[i].state = DSH_NEW; + thd[i].req = &req[i]; + } + + /* start all the other threads (up to _MAX_THREAD_COUNT active) */ + for (i=0; i<ctx->num_tasks; i++) { + /* wait until "room" for another thread */ + slurm_mutex_lock(&thread_mutex); + while (threads_active >= _MAX_THREAD_COUNT) { + pthread_cond_wait(&thread_cond, &thread_mutex); + } + + slurm_attr_init(&thd[i].attr); + (void) pthread_attr_setdetachstate(&thd[i].attr, + PTHREAD_CREATE_DETACHED); + while ((rc = pthread_create(&thd[i].thread, &thd[i].attr, + _thread_per_node_rpc, + (void *) &thd[i]))) { + if (threads_active) + pthread_cond_wait(&thread_cond, &thread_mutex); + else { + slurm_mutex_unlock(&thread_mutex); + sleep(1); + slurm_mutex_lock(&thread_mutex); + } + } + + threads_active++; + slurm_mutex_unlock(&thread_mutex); + } + + /* wait for all tasks to terminate*/ + slurm_mutex_lock(&thread_mutex); + for (i=0; i<ctx->num_tasks; i++) { + while (thd[i].state < DSH_DONE) { + /* wait until another thread completes*/ + pthread_cond_wait(&thread_cond, &thread_mutex); + } + } + slurm_mutex_unlock(&thread_mutex); + + xfree(thd); + return rc; +} + + +/* + * _thread_per_node_rpc - thread to issue an RPC to a single node + * IN/OUT args - pointer to thd_t entry + */ +static void *_thread_per_node_rpc(void *args) +{ + int rc; + thd_t *thread_ptr = (thd_t *) args; + state_t new_state; + + thread_ptr->tstart = time(NULL); + thread_ptr->state = DSH_ACTIVE; + + if (slurm_send_recv_rc_msg(thread_ptr->req, &rc, 0) < 0) { + new_state = DSH_FAILED; + goto cleanup; + } + + switch (rc) { + case SLURM_SUCCESS: + new_state = DSH_DONE; + break; + default: + slurm_seterrno(rc); + new_state = DSH_FAILED; + } + + cleanup: + slurm_mutex_lock(&thread_mutex); + thread_ptr->state = new_state; + threads_active--; + /* Signal completion so another thread can replace us */ + slurm_mutex_unlock(&thread_mutex); + pthread_cond_signal(&thread_cond); + + return (void *) NULL; +} diff --git a/src/common/slurm_protocol_defs.c b/src/common/slurm_protocol_defs.c index 7425ca48bc7..c4920b7a9b9 100644 --- a/src/common/slurm_protocol_defs.c +++ b/src/common/slurm_protocol_defs.c @@ -282,7 +282,6 @@ void slurm_free_launch_tasks_request_msg(launch_tasks_request_msg_t * msg) if (msg == NULL) return; - slurm_cred_destroy(msg->cred); if (msg->env) { @@ -309,6 +308,34 @@ void slurm_free_launch_tasks_request_msg(launch_tasks_request_msg_t * msg) xfree(msg); } +void slurm_free_spawn_task_request_msg(spawn_task_request_msg_t * msg) +{ + int i; + if (msg == NULL) + return; + + slurm_cred_destroy(msg->cred); + + if (msg->env) { + for (i = 0; i < msg->envc; i++) { + xfree(msg->env[i]); + } + xfree(msg->env); + } + xfree(msg->cwd); + if (msg->argv) { + for (i = 0; i < msg->argc; i++) { + xfree(msg->argv[i]); + } + xfree(msg->argv); + } + + if (msg->switch_job) + switch_free_jobinfo(msg->switch_job); + + xfree(msg); +} + void slurm_free_reattach_tasks_request_msg(reattach_tasks_request_msg_t *msg) { if (msg) { diff --git a/src/common/slurm_protocol_defs.h b/src/common/slurm_protocol_defs.h index 38196d0bd58..fbbe5d1961b 100644 --- a/src/common/slurm_protocol_defs.h +++ b/src/common/slurm_protocol_defs.h @@ -48,7 +48,8 @@ #include "src/common/xassert.h" -/* used to define flags of the launch_tasks_request_msg_t.task_flags +/* used to define flags of the launch_tasks_request_msg_t.and + * spawn task_request_msg_t task_flags */ enum task_flag_vals { TASK_PARALLEL_DEBUG = 0x1, @@ -133,6 +134,7 @@ typedef enum { REQUEST_KILL_TIMELIMIT, REQUEST_KILL_JOB, MESSAGE_EPILOG_COMPLETE, + REQUEST_SPAWN_TASK, SRUN_PING = 7001, SRUN_TIMEOUT, @@ -277,6 +279,29 @@ typedef struct launch_tasks_response_msg { uint32_t *local_pids; } launch_tasks_response_msg_t; +typedef struct spawn_task_request_msg { + uint32_t job_id; + uint32_t job_step_id; + uint32_t nnodes; /* number of nodes in this job step */ + uint32_t nprocs; /* number of processes in this job step */ + uint32_t uid; + uint32_t srun_node_id; /* node id of this node (relative to job) */ + uint16_t envc; + uint16_t argc; + uint16_t cpus_allocated; + char **env; + char **argv; + char *cwd; + uint16_t io_port; + uint16_t task_flags; + uint32_t global_task_id; + + int32_t slurmd_debug; /* remote slurmd debug level */ + + slurm_cred_t cred; /* job credential */ + switch_jobinfo_t switch_job; /* switch credential for the job */ +} spawn_task_request_msg_t; + typedef struct task_ext_msg { uint32_t num_tasks; uint32_t *task_id_list; @@ -424,6 +449,8 @@ void inline slurm_free_launch_tasks_request_msg(launch_tasks_request_msg_t * msg); void inline slurm_free_launch_tasks_response_msg(launch_tasks_response_msg_t * msg); +void inline slurm_free_spawn_task_request_msg(spawn_task_request_msg_t * + msg); void inline slurm_free_task_exit_msg(task_exit_msg_t * msg); void inline slurm_free_kill_tasks_msg(kill_tasks_msg_t * msg); diff --git a/src/common/slurm_protocol_pack.c b/src/common/slurm_protocol_pack.c index 815f4ed9e31..16a20ac176b 100644 --- a/src/common/slurm_protocol_pack.c +++ b/src/common/slurm_protocol_pack.c @@ -128,6 +128,12 @@ static void _pack_launch_tasks_request_msg(launch_tasks_request_msg_t * static int _unpack_launch_tasks_request_msg(launch_tasks_request_msg_t ** msg_ptr, Buf buffer); + +static void _pack_spawn_task_request_msg(spawn_task_request_msg_t * + msg, Buf buffer); +static int _unpack_spawn_task_request_msg(spawn_task_request_msg_t ** + msg_ptr, Buf buffer); + static void _pack_cancel_tasks_msg(kill_tasks_msg_t * msg, Buf buffer); static int _unpack_cancel_tasks_msg(kill_tasks_msg_t ** msg_ptr, Buf buffer); @@ -372,6 +378,11 @@ pack_msg(slurm_msg_t const *msg, Buf buffer) _pack_launch_tasks_response_msg((launch_tasks_response_msg_t *) msg->data, buffer); break; + case REQUEST_SPAWN_TASK: + _pack_spawn_task_request_msg( + (spawn_task_request_msg_t *) + msg->data, buffer); + break; case REQUEST_KILL_TASKS: _pack_cancel_tasks_msg((kill_tasks_msg_t *) msg->data, buffer); @@ -590,6 +601,11 @@ unpack_msg(slurm_msg_t * msg, Buf buffer) (launch_tasks_response_msg_t **) & (msg->data), buffer); break; + case REQUEST_SPAWN_TASK: + rc = _unpack_spawn_task_request_msg( + (spawn_task_request_msg_t **) + & (msg->data), buffer); + break; case REQUEST_REATTACH_TASKS: rc = _unpack_reattach_tasks_request_msg( (reattach_tasks_request_msg_t **) & msg->data, @@ -2208,6 +2224,69 @@ _unpack_launch_tasks_request_msg(launch_tasks_request_msg_t ** return SLURM_ERROR; } +static void +_pack_spawn_task_request_msg(spawn_task_request_msg_t * msg, Buf buffer) +{ + pack32(msg->job_id, buffer); + pack32(msg->job_step_id, buffer); + pack32(msg->nnodes, buffer); + pack32(msg->nprocs, buffer); + pack32(msg->uid, buffer); + pack32(msg->srun_node_id, buffer); + slurm_cred_pack(msg->cred, buffer); + packstr_array(msg->env, msg->envc, buffer); + packstr(msg->cwd, buffer); + packstr_array(msg->argv, msg->argc, buffer); + pack16(msg->io_port, buffer); + pack16(msg->task_flags, buffer); + pack16(msg->cpus_allocated, buffer); + pack32(msg->slurmd_debug, buffer); + pack32(msg->global_task_id, buffer); + switch_pack_jobinfo(msg->switch_job, buffer); +} + +static int +_unpack_spawn_task_request_msg(spawn_task_request_msg_t ** + msg_ptr, Buf buffer) +{ + uint16_t uint16_tmp; + spawn_task_request_msg_t *msg; + + msg = xmalloc(sizeof(launch_tasks_request_msg_t)); + *msg_ptr = msg; + + safe_unpack32(&msg->job_id, buffer); + safe_unpack32(&msg->job_step_id, buffer); + safe_unpack32(&msg->nnodes, buffer); + safe_unpack32(&msg->nprocs, buffer); + safe_unpack32(&msg->uid, buffer); + safe_unpack32(&msg->srun_node_id, buffer); + if (!(msg->cred = slurm_cred_unpack(buffer))) + goto unpack_error; + safe_unpackstr_array(&msg->env, &msg->envc, buffer); + safe_unpackstr_xmalloc(&msg->cwd, &uint16_tmp, buffer); + safe_unpackstr_array(&msg->argv, &msg->argc, buffer); + safe_unpack16(&msg->io_port, buffer); + safe_unpack16(&msg->task_flags, buffer); + safe_unpack16(&msg->cpus_allocated, buffer); + safe_unpack32(&msg->slurmd_debug, buffer); + safe_unpack32(&msg->global_task_id, buffer); + + switch_alloc_jobinfo(&msg->switch_job); + if (switch_unpack_jobinfo(msg->switch_job, buffer) < 0) { + error("switch_unpack_jobinfo: %m"); + switch_free_jobinfo(msg->switch_job); + goto unpack_error; + } + + return SLURM_SUCCESS; + + unpack_error: + slurm_free_spawn_task_request_msg(msg); + *msg_ptr = NULL; + return SLURM_ERROR; +} + static void _pack_cancel_tasks_msg(kill_tasks_msg_t * msg, Buf buffer) { diff --git a/src/slurmd/job.c b/src/slurmd/job.c index 0f0efaac024..b0d06fc061b 100644 --- a/src/slurmd/job.c +++ b/src/slurmd/job.c @@ -167,6 +167,77 @@ job_create(launch_tasks_request_msg_t *msg, slurm_addr *cli_addr) return job; } +/* create a slurmd job structure from a spawn task message */ +slurmd_job_t * +job_spawn_create(spawn_task_request_msg_t *msg, slurm_addr *cli_addr) +{ + struct passwd *pwd; + slurmd_job_t *job; + srun_info_t *srun; + slurm_addr io_addr; + + xassert(msg != NULL); + + debug3("entering job_spawn_create"); + + if ((pwd = _pwd_create((uid_t)msg->uid)) == NULL) { + error("uid %ld not found on system", (long) msg->uid); + slurm_seterrno (ESLURMD_UID_NOT_FOUND); + return NULL; + } + job = xmalloc(sizeof(*job)); + + job->jobid = msg->job_id; + job->stepid = msg->job_step_id; + job->uid = (uid_t) msg->uid; + job->pwd = pwd; + job->gid = job->pwd->pw_gid; + job->nprocs = msg->nprocs; + job->nnodes = msg->nnodes; + job->nodeid = msg->srun_node_id; + job->ntasks = 1; /* tasks to launch always one */ + job->debug = msg->slurmd_debug; + job->cpus = msg->cpus_allocated; + + job->timelimit = (time_t) -1; + job->task_flags = msg->task_flags; + job->spawn_task = true; + + job->env = _array_copy(msg->envc, msg->env); + job->argc = msg->argc; + job->argv = _array_copy(job->argc, msg->argv); + + job->cwd = xstrdup(msg->cwd); + + memcpy(&io_addr, cli_addr, sizeof(slurm_addr)); + slurm_set_addr(&io_addr, msg->io_port, NULL); + + job->switch_job = msg->switch_job; + + job->objs = list_create((ListDelF) io_obj_destroy); + job->eio = eio_handle_create(); + + srun = srun_info_create(msg->cred, NULL, &io_addr); + + job->sruns = list_create((ListDelF) _srun_info_destructor); + + list_append(job->sruns, (void *) srun); + + _job_init_task_info(job, &(msg->global_task_id)); + + if (pipe(job->fdpair) < 0) { + error("pipe: %m"); + return NULL; + } + + fd_set_close_on_exec(job->fdpair[0]); + fd_set_close_on_exec(job->fdpair[1]); + + job->smgr_status = -1; + + return job; +} + /* * return the default output filename for a batch job */ @@ -344,9 +415,6 @@ srun_info_create(slurm_cred_t cred, slurm_addr *resp_addr, slurm_addr *ioaddr) srun_key_t *key = xmalloc(sizeof(*key )); srun->key = key; - srun->ofname = NULL; - srun->efname = NULL; - srun->ifname = NULL; /* * If no credential was provided, return the empty diff --git a/src/slurmd/job.h b/src/slurmd/job.h index baff2684613..b71107761b8 100644 --- a/src/slurmd/job.h +++ b/src/slurmd/job.h @@ -116,6 +116,7 @@ typedef struct slurmd_job { bool batch; /* true if this is a batch job */ bool run_prolog; /* true if need to run prolog */ + bool spawn_task; /* stand-alone task */ time_t timelimit; /* time at which job must stop */ struct passwd *pwd; /* saved passwd struct for user job */ @@ -139,6 +140,7 @@ typedef struct slurmd_job { slurmd_job_t * job_create(launch_tasks_request_msg_t *msg, slurm_addr *client); slurmd_job_t * job_batch_job_create(batch_job_launch_msg_t *msg); +slurmd_job_t * job_spawn_create(spawn_task_request_msg_t *msg, slurm_addr *client); void job_kill(slurmd_job_t *job, int signal); diff --git a/src/slurmd/mgr.c b/src/slurmd/mgr.c index 329a06b96ad..e8a21843bf6 100644 --- a/src/slurmd/mgr.c +++ b/src/slurmd/mgr.c @@ -45,6 +45,7 @@ #include <unistd.h> #include <pwd.h> #include <grp.h> +#include <stdio.h> #include <string.h> #if HAVE_STDLIB_H @@ -112,6 +113,7 @@ static void _send_launch_failure(launch_tasks_request_msg_t *, static int _job_mgr(slurmd_job_t *job); static void _set_job_log_prefix(slurmd_job_t *job); static int _setup_io(slurmd_job_t *job); +static int _setup_spawn_io(slurmd_job_t *job); static int _drop_privileges(struct passwd *pwd); static int _reclaim_privileges(struct passwd *pwd); static void _send_launch_resp(slurmd_job_t *job, int rc); @@ -224,7 +226,31 @@ mgr_launch_batch_job(batch_job_launch_msg_t *msg, slurm_addr *cli) return 0; } +/* + * Spawn a task / job step on the current node + */ +int +mgr_spawn_task(spawn_task_request_msg_t *msg, slurm_addr *cli) +{ + slurmd_job_t *job = NULL; + + if (!(job = job_spawn_create(msg, cli))) + return SLURM_ERROR; + + job->spawn_task = true; + _set_job_log_prefix(job); + + _setargs(job); + _set_launch_ip_in_env(job, cli); + + if (_job_mgr(job) < 0) + return SLURM_ERROR; + + job_destroy(job); + + return SLURM_SUCCESS; +} /* * Run a prolog or epilog script. @@ -346,6 +372,23 @@ _setup_io(slurmd_job_t *job) return SLURM_SUCCESS; } + +static int +_setup_spawn_io(slurmd_job_t *job) +{ + _slurmd_job_log_init(job); + +#ifndef NDEBUG +# ifdef PR_SET_DUMPABLE + if (prctl(PR_SET_DUMPABLE, 1) < 0) + debug ("Unable to set dumpable to 1"); +# endif /* PR_SET_DUMPABLE */ +#endif /* !NDEBUG */ + + return SLURM_SUCCESS; +} + + static void _random_sleep(slurmd_job_t *job) { @@ -437,7 +480,11 @@ _job_mgr(slurmd_job_t *job) xsignal_block(mgr_sigarray); xsignal(SIGHUP, _hup_handler); - if ((rc = _setup_io(job))) + if (job->spawn_task) + rc = _setup_spawn_io(job); + else + rc = _setup_io(job); + if (rc) goto fail2; /* @@ -453,8 +500,7 @@ _job_mgr(slurmd_job_t *job) /* * Send job launch response with list of pids */ - if (!job->batch) - _send_launch_resp(job, 0); + _send_launch_resp(job, 0); /* * Wait for all tasks to exit @@ -489,10 +535,10 @@ _job_mgr(slurmd_job_t *job) error("interconnect_postfini: %m"); /* - * Wait for io thread to complete + * Wait for io thread to complete (if there is one) */ - _wait_for_io(job); - + if (!job->spawn_task) + _wait_for_io(job); job_update_state(job, SLURMD_JOB_COMPLETE); @@ -503,12 +549,13 @@ _job_mgr(slurmd_job_t *job) /* If interactive job startup was abnormal, * be sure to notify client. */ - if ((rc != 0) && !job->batch) + if (rc != 0) _send_launch_resp(job, rc); return(rc); } + /* * update task information from "job" into shared memory */ @@ -983,6 +1030,9 @@ _send_launch_resp(slurmd_job_t *job, int rc) launch_tasks_response_msg_t resp; srun_info_t *srun = list_peek(job->sruns); + if (job->batch || job->spawn_task) + return; + debug("Sending launch resp rc=%d", rc); resp_msg.address = srun->resp_addr; @@ -1109,14 +1159,13 @@ _reclaim_privileges(struct passwd *pwd) } - - static void _slurmd_job_log_init(slurmd_job_t *job) { char argv0[64]; - conf->log_opts.buffered = 1; + if (!job->spawn_task) + conf->log_opts.buffered = 1; /* * Reset stderr logging to user requested level @@ -1133,14 +1182,14 @@ _slurmd_job_log_init(slurmd_job_t *job) log_set_argv0(argv0); /* Connect slurmd stderr to job's stderr */ - if (dup2(job->task[0]->perr[1], STDERR_FILENO) < 0) { + if ((!job->spawn_task) && + (dup2(job->task[0]->perr[1], STDERR_FILENO) < 0)) { error("job_log_init: dup2(stderr): %m"); return; } } - static void _setargs(slurmd_job_t *job) { diff --git a/src/slurmd/mgr.h b/src/slurmd/mgr.h index eb36e9eabf6..db667ea244f 100644 --- a/src/slurmd/mgr.h +++ b/src/slurmd/mgr.h @@ -34,6 +34,10 @@ #include "src/slurmd/job.h" +/* Spawn a task / job step on this node + */ +int mgr_spawn_task(spawn_task_request_msg_t *msg, slurm_addr *client); + /* Launch a job step on this node */ int mgr_launch_tasks(launch_tasks_request_msg_t *msg, slurm_addr *client); diff --git a/src/slurmd/req.c b/src/slurmd/req.c index f00dafe0ba7..4bfd334ac67 100644 --- a/src/slurmd/req.c +++ b/src/slurmd/req.c @@ -64,6 +64,7 @@ static bool _job_still_running(uint32_t job_id); static int _kill_all_active_steps(uint32_t jobid, int sig); static int _launch_tasks(launch_tasks_request_msg_t *, slurm_addr *); static void _rpc_launch_tasks(slurm_msg_t *, slurm_addr *); +static void _rpc_spawn_task(slurm_msg_t *, slurm_addr *); static void _rpc_batch_job(slurm_msg_t *, slurm_addr *); static void _rpc_kill_tasks(slurm_msg_t *, slurm_addr *); static void _rpc_timelimit(slurm_msg_t *, slurm_addr *); @@ -76,6 +77,7 @@ static void _rpc_pid2jid(slurm_msg_t *msg, slurm_addr *); static int _rpc_ping(slurm_msg_t *, slurm_addr *); static int _run_prolog(uint32_t jobid, uid_t uid); static int _run_epilog(uint32_t jobid, uid_t uid); +static int _spawn_task(spawn_task_request_msg_t *, slurm_addr *); static bool _pause_for_job_completion (uint32_t jobid, int maxtime); static int _waiter_init (uint32_t jobid); @@ -106,6 +108,12 @@ slurmd_req(slurm_msg_t *msg, slurm_addr *cli) slurm_free_launch_tasks_request_msg(msg->data); slurm_mutex_unlock(&launch_mutex); break; + case REQUEST_SPAWN_TASK: + slurm_mutex_lock(&launch_mutex); + _rpc_spawn_task(msg, cli); + slurm_free_spawn_task_request_msg(msg->data); + slurm_mutex_unlock(&launch_mutex); + break; case REQUEST_KILL_TASKS: _rpc_kill_tasks(msg, cli); slurm_free_kill_tasks_msg(msg->data); @@ -268,7 +276,18 @@ _launch_tasks(launch_tasks_request_msg_t *req, slurm_addr *cli) return (retval <= 0) ? retval : 0; } - + +static int +_spawn_task(spawn_task_request_msg_t *req, slurm_addr *cli) +{ + int retval; + + if ((retval = _fork_new_slurmd()) == 0) + exit (mgr_spawn_task(req, cli)); + + return (retval <= 0) ? retval : 0; +} + static int _check_job_credential(slurm_cred_t cred, uint32_t jobid, uint32_t stepid, uid_t uid) @@ -400,6 +419,74 @@ _rpc_launch_tasks(slurm_msg_t *msg, slurm_addr *cli) } +static void +_rpc_spawn_task(slurm_msg_t *msg, slurm_addr *cli) +{ + int errnum = 0; + uint16_t port; + char host[MAXHOSTNAMELEN]; + uid_t req_uid; + spawn_task_request_msg_t *req = msg->data; + uint32_t jobid = req->job_id; + uint32_t stepid = req->job_step_id; + bool super_user = false, run_prolog = false; + + req_uid = g_slurm_auth_get_uid(msg->cred); + + super_user = _slurm_authorized_user(req_uid); + + if ((super_user == false) && (req_uid != req->uid)) { + error("spawn task request from uid %u", + (unsigned int) req_uid); + errnum = ESLURM_USER_ID_MISSING; /* or invalid user */ + goto done; + } + + slurmd_get_addr(cli, &port, host, sizeof(host)); + info("spawn task %u.%u request from %u@%s", req->job_id, + req->job_step_id, req->uid, host); + + if (!slurm_cred_jobid_cached(conf->vctx, req->job_id)) + run_prolog = true; + + if (_check_job_credential(req->cred, jobid, stepid, req_uid) < 0) { + errnum = errno; + error("Invalid job credential from %ld@%s: %m", + (long) req_uid, host); + goto done; + } + + /* xassert(slurm_cred_jobid_cached(conf->vctx, req->job_id));*/ + + /* Run job prolog if necessary */ + if (run_prolog && (_run_prolog(req->job_id, req->uid) != 0)) { + error("[job %u] prolog failed", req->job_id); + errnum = ESLURMD_PROLOG_FAILED; + goto done; + } + + if (_spawn_task(req, cli) < 0) + errnum = errno; + + done: + if (slurm_send_rc_msg(msg, errnum) < 0) { + + error("spawn_task: unable to send return code: %m"); + + /* + * Rewind credential so that srun may perform retry + */ + slurm_cred_rewind(conf->vctx, req->cred); /* ignore errors */ + + } else if (errnum == SLURM_SUCCESS) + save_cred_state(conf->vctx); + + /* + * If job prolog failed, indicate failure to slurmctld + */ + if (errnum == ESLURMD_PROLOG_FAILED) + send_registration_msg(errnum); +} static void _rpc_batch_job(slurm_msg_t *msg, slurm_addr *cli) { diff --git a/src/slurmd/smgr.c b/src/slurmd/smgr.c index 604bac3503d..37b47b864b0 100644 --- a/src/slurmd/smgr.c +++ b/src/slurmd/smgr.c @@ -85,6 +85,7 @@ static int _send_exit_status(slurmd_job_t *job, pid_t pid, int status); static char *_signame(int signo); static void _cleanup_file_descriptors(slurmd_job_t *job); static int _setup_env(slurmd_job_t *job, int taskid); +static void _setup_spawn_io(slurmd_job_t *job); /* parallel debugger support */ static void _pdebug_trace_process(slurmd_job_t *job, pid_t pid); @@ -166,7 +167,7 @@ _session_mgr(slurmd_job_t *job) } } - if (set_user_limits(job) < 0) { + if ((!job->spawn_task) && (set_user_limits(job) < 0)) { debug("Unable to set user limits"); exit(5); } @@ -197,6 +198,28 @@ _session_mgr(slurmd_job_t *job) exit(SLURM_SUCCESS); } +static void _setup_spawn_io(slurmd_job_t *job) +{ + srun_info_t *srun; + int fd = -1; + + srun = list_peek(job->sruns); + xassert(srun); + if ((fd = (int) slurm_open_stream(&srun->ioaddr)) < 0) { + error("connect io: %m"); + exit(1); + } + fd_set_nonblocking(fd); + (void) close(STDIN_FILENO); + (void) close(STDOUT_FILENO); + (void) close(STDERR_FILENO); + if ((dup(fd) != 0) || (dup(fd) != 1) || (dup(fd) != 2)) { + error("dup: %m"); + exit(1); + } + (void) close(fd); +} + /* Close write end of stdin (at the very least) */ static void @@ -316,7 +339,10 @@ _exec_task(slurmd_job_t *job, int i) * If io_prepare_child() is moved above interconnect_attach() * this causes EBADF from qsw_attach(). Why? */ - io_prepare_child(job->task[i]); + if (job->spawn_task) + _setup_spawn_io(job); + else + io_prepare_child(job->task[i]); execve(job->argv[0], job->argv, job->env); -- GitLab